You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:51 UTC
[15/51] [abbrv] [partial] brooklyn-library git commit: move subdir
from incubator up a level as it is promoted to its own repo (first
non-incubator commit!)
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
deleted file mode 100644
index 4396ed1..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
+++ /dev/null
@@ -1,597 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nonnull;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.config.render.RendererHints;
-import org.apache.brooklyn.core.effector.Effectors;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
-import org.apache.brooklyn.enricher.stock.Enrichers;
-import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import org.apache.brooklyn.entity.group.DynamicClusterImpl;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.feed.http.HttpFeed;
-import org.apache.brooklyn.feed.http.HttpPollConfig;
-import org.apache.brooklyn.feed.http.HttpValueFunctions;
-import org.apache.brooklyn.feed.http.JsonFunctions;
-import org.apache.brooklyn.util.http.HttpToolResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.util.collections.CollectionFunctionals;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.collections.QuorumCheck;
-import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.core.task.TaskBuilder;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.guava.IfFunctions;
-import org.apache.brooklyn.util.math.MathPredicates;
-import org.apache.brooklyn.util.text.ByteSizeStrings;
-import org.apache.brooklyn.util.text.StringFunctions;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-
-public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster {
-
- /*
- * Refactoring required:
- *
- * Currently, on start() the cluster waits for an arbitrary SERVICE_UP_TIME_OUT (3 minutes) before assuming that a quorate
- * number of servers are available. The servers are then added to the cluster, and a further wait period of
- * DELAY_BEFORE_ADVERTISING_CLUSTER (30 seconds) is used before advertising the cluster
- *
- * DELAY_BEFORE_ADVERTISING_CLUSTER: It should be possible to refactor this away by adding a repeater that will poll
- * the REST API of the primary node (once established) until the API indicates that the cluster is available
- *
- * SERVICE_UP_TIME_OUT: The refactoring of this would be more substantial. One method would be to remove the bulk of the
- * logic from the start() method, and rely entirely on the membership tracking policy and the onServerPoolMemberChanged()
- * method. The addition of a RUNNING sensor on the nodes would allow the cluster to determine that a node is up and
- * running but has not yet been added to the cluster. The IS_CLUSTER_INITIALIZED key could be used to determine whether
- * or not the cluster should be initialized, or a node simply added to an existing cluster. A repeater could be used
- * in the driver's to ensure that the method does not return until the node has been fully added
- *
- * There is an (incomplete) first-pass at this here: https://github.com/Nakomis/incubator-brooklyn/compare/couchbase-running-sensor
- * however, there have been significant changes to the cluster initialization since that work was done so it will probably
- * need to be re-done
- *
- * Additionally, during bucket creation, a HttpPoll is used to check that the bucket has been created. This should be
- * refactored to use a Repeater in CouchbaseNodeSshDriver.bucketCreate() in a similar way to the one employed in
- * CouchbaseNodeSshDriver.rebalance(). Were this done, this class could simply queue the bucket creation tasks
- *
- */
-
- private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class);
- private final Object mutex = new Object[0];
- // Used to serialize bucket creation as only one bucket can be created at a time,
- // so a feed is used to determine when a bucket has finished being created
- private final AtomicReference<HttpFeed> resetBucketCreation = new AtomicReference<HttpFeed>();
-
- public void init() {
- log.info("Initializing the Couchbase cluster...");
- super.init();
-
- enrichers().add(
- Enrichers.builder()
- .transforming(COUCHBASE_CLUSTER_UP_NODES)
- .from(this)
- .publishing(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
- .computing(new ListOfHostAndPort()).build() );
- enrichers().add(
- Enrichers.builder()
- .transforming(COUCHBASE_CLUSTER_UP_NODE_ADDRESSES)
- .from(this)
- .publishing(COUCHBASE_CLUSTER_CONNECTION_URL)
- .computing(
- IfFunctions.<List<String>>ifPredicate(
- Predicates.compose(MathPredicates.lessThan(getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE)),
- CollectionFunctionals.sizeFunction(0)) )
- .value((String)null)
- .defaultApply(
- Functionals.chain(
- CollectionFunctionals.<String,List<String>>limit(4),
- StringFunctions.joiner(","),
- StringFunctions.formatter("http://%s/"))) )
- .build() );
-
- Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup =
- ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder()
- .put(CouchbaseNode.OPS, CouchbaseCluster.OPS_PER_NODE)
- .put(CouchbaseNode.COUCH_DOCS_DATA_SIZE, CouchbaseCluster.COUCH_DOCS_DATA_SIZE_PER_NODE)
- .put(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE)
- .put(CouchbaseNode.EP_BG_FETCHED, CouchbaseCluster.EP_BG_FETCHED_PER_NODE)
- .put(CouchbaseNode.MEM_USED, CouchbaseCluster.MEM_USED_PER_NODE)
- .put(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, CouchbaseCluster.COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE)
- .put(CouchbaseNode.CURR_ITEMS, CouchbaseCluster.CURR_ITEMS_PER_NODE)
- .put(CouchbaseNode.VB_REPLICA_CURR_ITEMS, CouchbaseCluster.VB_REPLICA_CURR_ITEMS_PER_NODE)
- .put(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, CouchbaseCluster.COUCH_VIEWS_DATA_SIZE_PER_NODE)
- .put(CouchbaseNode.GET_HITS, CouchbaseCluster.GET_HITS_PER_NODE)
- .put(CouchbaseNode.CMD_GET, CouchbaseCluster.CMD_GET_PER_NODE)
- .put(CouchbaseNode.CURR_ITEMS_TOT, CouchbaseCluster.CURR_ITEMS_TOT_PER_NODE)
- .build();
-
- for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) {
- addSummingMemberEnricher(nodeSensor);
- addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor));
- }
-
- enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
- .from(IS_CLUSTER_INITIALIZED).computing(
- IfFunctions.ifNotEquals(true).value("The cluster is not yet completely initialized")
- .defaultValue(null).build()).build() );
- }
-
- private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) {
- enrichers().add(Enrichers.builder()
- .aggregating(fromSensor)
- .publishing(toSensor)
- .fromMembers()
- .computingAverage()
- .build()
- );
- }
-
- private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) {
- enrichers().add(Enrichers.builder()
- .aggregating(source)
- .publishing(source)
- .fromMembers()
- .computingSum()
- .build()
- );
- }
-
- @Override
- protected void doStart() {
- sensors().set(IS_CLUSTER_INITIALIZED, false);
-
- super.doStart();
-
- connectSensors();
-
- sensors().set(BUCKET_CREATION_IN_PROGRESS, false);
-
- //start timeout before adding the servers
- Tasks.setBlockingDetails("Pausing while Couchbase stabilizes");
- Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY));
-
- Optional<Set<Entity>> upNodes = Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES));
- if (upNodes.isPresent() && !upNodes.get().isEmpty()) {
-
- Tasks.setBlockingDetails("Adding servers to Couchbase");
-
- //TODO: select a new primary node if this one fails
- Entity primaryNode = upNodes.get().iterator().next();
- ((EntityInternal) primaryNode).sensors().set(CouchbaseNode.IS_PRIMARY_NODE, true);
- sensors().set(COUCHBASE_PRIMARY_NODE, primaryNode);
-
- Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes());
-
- if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() > 1) {
- log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), getQuorumSize()});
- addServers(serversToAdd);
-
- //wait for servers to be added to the couchbase server
- try {
- Tasks.setBlockingDetails("Delaying before advertising cluster up");
- Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
- } finally {
- Tasks.resetBlockingDetails();
- }
-
- ((CouchbaseNode)getPrimaryNode()).rebalance();
- } else {
- if (getQuorumSize()>1) {
- log.warn(this+" is not quorate; will likely fail later, but proceeding for now");
- }
- for (Entity server: serversToAdd) {
- ((EntityInternal) server).sensors().set(CouchbaseNode.IS_IN_CLUSTER, true);
- }
- }
-
- if (getConfig(CREATE_BUCKETS)!=null) {
- try {
- Tasks.setBlockingDetails("Creating buckets in Couchbase");
-
- createBuckets();
- DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
-
- } finally {
- Tasks.resetBlockingDetails();
- }
- }
-
- if (getConfig(REPLICATION)!=null) {
- try {
- Tasks.setBlockingDetails("Configuring replication rules");
-
- List<Map<String, Object>> replRules = getConfig(REPLICATION);
- for (Map<String, Object> replRule: replRules) {
- DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), CouchbaseNode.ADD_REPLICATION_RULE, replRule));
- }
- DynamicTasks.waitForLast();
-
- } finally {
- Tasks.resetBlockingDetails();
- }
- }
-
- sensors().set(IS_CLUSTER_INITIALIZED, true);
-
- } else {
- throw new IllegalStateException("No up nodes available after starting");
- }
- }
-
- @Override
- public void stop() {
- if (resetBucketCreation.get() != null) {
- resetBucketCreation.get().stop();
- }
- super.stop();
- }
-
- protected void connectSensors() {
- policies().add(PolicySpec.create(MemberTrackingPolicy.class)
- .displayName("Controller targets tracker")
- .configure("group", this));
- }
-
- private final static class ListOfHostAndPort implements Function<Set<Entity>, List<String>> {
- @Override public List<String> apply(Set<Entity> input) {
- List<String> addresses = Lists.newArrayList();
- for (Entity entity : input) {
- addresses.add(String.format("%s",
- BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT))));
- }
- return addresses;
- }
- }
-
- public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
- @Override protected void onEntityChange(Entity member) {
- ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
- }
-
- @Override protected void onEntityAdded(Entity member) {
- ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
- }
-
- @Override protected void onEntityRemoved(Entity member) {
- ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member);
- }
- };
-
- protected synchronized void onServerPoolMemberChanged(Entity member) {
- if (log.isTraceEnabled()) log.trace("For {}, considering membership of {} which is in locations {}",
- new Object[]{this, member, member.getLocations()});
-
- //FIXME: make use of servers to be added after cluster initialization.
- synchronized (mutex) {
- if (belongsInServerPool(member)) {
-
- Optional<Set<Entity>> upNodes = Optional.fromNullable(getUpNodes());
- if (upNodes.isPresent()) {
-
- if (!upNodes.get().contains(member)) {
- Set<Entity> newNodes = Sets.newHashSet(getUpNodes());
- newNodes.add(member);
- sensors().set(COUCHBASE_CLUSTER_UP_NODES, newNodes);
-
- //add to set of servers to be added.
- if (isClusterInitialized()) {
- addServer(member);
- }
- }
- } else {
- Set<Entity> newNodes = Sets.newHashSet();
- newNodes.add(member);
- sensors().set(COUCHBASE_CLUSTER_UP_NODES, newNodes);
-
- if (isClusterInitialized()) {
- addServer(member);
- }
- }
- } else {
- Set<Entity> upNodes = getUpNodes();
- if (upNodes != null && upNodes.contains(member)) {
- upNodes.remove(member);
- sensors().set(COUCHBASE_CLUSTER_UP_NODES, upNodes);
- log.info("Removing couchbase node {}: {}; from cluster", new Object[]{this, member});
- }
- }
- if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
- }
- }
-
- protected boolean belongsInServerPool(Entity member) {
- if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) {
- if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not up", this, member);
- return false;
- }
- if (!getMembers().contains(member)) {
- if (log.isTraceEnabled())
- log.trace("Members of {}, checking {}, eliminating because not member", this, member);
-
- return false;
- }
- if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, approving", this, member);
-
- return true;
- }
-
-
- protected EntitySpec<?> getMemberSpec() {
- EntitySpec<?> result = super.getMemberSpec();
- if (result != null) return result;
- return EntitySpec.create(CouchbaseNode.class);
- }
-
- @Override
- public int getQuorumSize() {
- Integer quorumSize = getConfig(CouchbaseCluster.INITIAL_QUORUM_SIZE);
- if (quorumSize != null && quorumSize > 0)
- return quorumSize;
- // by default the quorum would be floor(initial_cluster_size/2) + 1
- return (int) Math.floor(getConfig(INITIAL_SIZE) / 2) + 1;
- }
-
- protected int getActualSize() {
- return Optional.fromNullable(getAttribute(CouchbaseCluster.ACTUAL_CLUSTER_SIZE)).or(-1);
- }
-
- private Set<Entity> getUpNodes() {
- return getAttribute(COUCHBASE_CLUSTER_UP_NODES);
- }
-
- private CouchbaseNode getPrimaryNode() {
- return (CouchbaseNode) getAttribute(COUCHBASE_PRIMARY_NODE);
- }
-
- @Override
- protected void initEnrichers() {
- enrichers().add(Enrichers.builder().updatingMap(ServiceStateLogic.SERVICE_NOT_UP_INDICATORS)
- .from(COUCHBASE_CLUSTER_UP_NODES)
- .computing(new Function<Set<Entity>, Object>() {
- @Override
- public Object apply(Set<Entity> input) {
- if (input==null) return "Couchbase up nodes not set";
- if (input.isEmpty()) return "No Couchbase up nodes";
- if (input.size() < getQuorumSize()) return "Couchbase up nodes not quorate";
- return null;
- }
- }).build());
-
- if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) {
- // TODO Only leaving CouchbaseQuorumCheck here in case it is contained in persisted state.
- // If so, need a transformer and then to delete it
- @SuppressWarnings({ "unused", "hiding" })
- @Deprecated
- class CouchbaseQuorumCheck implements QuorumCheck {
- @Override
- public boolean isQuorate(int sizeHealthy, int totalSize) {
- // check members count passed in AND the sensor
- if (sizeHealthy < getQuorumSize()) return false;
- return true;
- }
- }
- config().set(UP_QUORUM_CHECK, new CouchbaseClusterImpl.CouchbaseQuorumCheck(this));
- }
- super.initEnrichers();
- }
-
- static class CouchbaseQuorumCheck implements QuorumCheck {
- private final CouchbaseCluster cluster;
- CouchbaseQuorumCheck(CouchbaseCluster cluster) {
- this.cluster = cluster;
- }
- @Override
- public boolean isQuorate(int sizeHealthy, int totalSize) {
- // check members count passed in AND the sensor
- if (sizeHealthy < cluster.getQuorumSize()) return false;
- return true;
- }
- }
- protected void addServers(Set<Entity> serversToAdd) {
- Preconditions.checkNotNull(serversToAdd);
- for (Entity s : serversToAdd) {
- addServerSeveralTimes(s, 12, Duration.TEN_SECONDS);
- }
- }
-
- /** try adding in a loop because we are seeing spurious port failures in AWS */
- protected void addServerSeveralTimes(Entity s, int numRetries, Duration delayOnFailure) {
- try {
- addServer(s);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- if (numRetries<=0) throw Exceptions.propagate(e);
- // retry once after sleep because we are getting some odd primary-change events
- log.warn("Error adding "+s+" to "+this+", "+numRetries+" retries remaining, will retry after delay ("+e+")");
- Time.sleep(delayOnFailure);
- addServerSeveralTimes(s, numRetries-1, delayOnFailure);
- }
- }
-
- protected void addServer(Entity serverToAdd) {
- Preconditions.checkNotNull(serverToAdd);
- if (serverToAdd.equals(getPrimaryNode())) {
- // no need to add; but we pass it in anyway because it makes the calling logic easier
- return;
- }
- if (!isMemberInCluster(serverToAdd)) {
- HostAndPort webAdmin = HostAndPort.fromParts(serverToAdd.getAttribute(SoftwareProcess.SUBNET_HOSTNAME),
- serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
- String username = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
- String password = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-
- if (isClusterInitialized()) {
- Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD_AND_REBALANCE, webAdmin.toString(), username, password).getUnchecked();
- } else {
- Entities.invokeEffectorWithArgs(this, getPrimaryNode(), CouchbaseNode.SERVER_ADD, webAdmin.toString(), username, password).getUnchecked();
- }
- //FIXME check feedback of whether the server was added.
- ((EntityInternal) serverToAdd).sensors().set(CouchbaseNode.IS_IN_CLUSTER, true);
- }
- }
-
- /** finds the cluster name specified for a node or a cluster,
- * using {@link CouchbaseCluster#CLUSTER_NAME} or falling back to the cluster (or node) ID. */
- public static String getClusterName(Entity node) {
- String name = node.getConfig(CLUSTER_NAME);
- if (!Strings.isBlank(name)) return Strings.makeValidFilename(name);
- return getClusterOrNode(node).getId();
- }
-
- /** returns Couchbase cluster in ancestry, defaulting to the given node if none */
- @Nonnull public static Entity getClusterOrNode(Entity node) {
- Iterable<CouchbaseCluster> clusterNodes = Iterables.filter(Entities.ancestors(node), CouchbaseCluster.class);
- return Iterables.getFirst(clusterNodes, node);
- }
-
- public boolean isClusterInitialized() {
- return Optional.fromNullable(getAttribute(IS_CLUSTER_INITIALIZED)).or(false);
- }
-
- public boolean isMemberInCluster(Entity e) {
- return Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false);
- }
-
- public void createBuckets() {
- //TODO: check for port conflicts if buckets are being created with a port
- List<Map<String, Object>> bucketsToCreate = getConfig(CREATE_BUCKETS);
- if (bucketsToCreate==null) return;
-
- Entity primaryNode = getPrimaryNode();
-
- for (Map<String, Object> bucketMap : bucketsToCreate) {
- String bucketName = bucketMap.containsKey("bucket") ? (String) bucketMap.get("bucket") : "default";
- String bucketType = bucketMap.containsKey("bucket-type") ? (String) bucketMap.get("bucket-type") : "couchbase";
- // default bucket must be on this port; other buckets can (must) specify their own (unique) port
- Integer bucketPort = bucketMap.containsKey("bucket-port") ? (Integer) bucketMap.get("bucket-port") : 11211;
- Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? (Integer) bucketMap.get("bucket-ramsize") : 100;
- Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? (Integer) bucketMap.get("bucket-replica") : 1;
-
- createBucket(primaryNode, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
- }
- }
-
- public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) {
- DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().displayName("Creating bucket " + bucketName).body(
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
- if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
- CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
- }
- sensors().set(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
- HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
-
- CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder()
- .entity(CouchbaseClusterImpl.this)
- .period(500, TimeUnit.MILLISECONDS)
- .baseUri(String.format("http://%s/pools/default/buckets/%s", hostAndPort, bucketName))
- .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
- .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
- .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
- @Override
- public Boolean apply(JsonElement input) {
- // Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable)
- JsonArray servers = input.getAsJsonArray();
- if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) {
- return true;
- }
- for (JsonElement server : servers) {
- Object api = server.getAsJsonObject().get("couchApiBase");
- if (api == null || Strings.isEmpty(String.valueOf(api))) {
- return true;
- }
- }
- return false;
- }
- }))
- .onFailureOrException(new Function<Object, Boolean>() {
- @Override
- public Boolean apply(Object input) {
- if (input instanceof HttpToolResponse) {
- if (((HttpToolResponse) input).getResponseCode() == 404) {
- return true;
- }
- }
- if (input instanceof Throwable)
- Exceptions.propagate((Throwable) input);
- throw new IllegalStateException("Unexpected response when creating bucket:" + input);
- }
- }))
- .build());
-
- // TODO: Bail out if bucket creation fails, to allow next bucket to proceed
- Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
- DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
- if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
- CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
- }
- return null;
- }
- }
- ).build()).orSubmitAndBlock();
- }
-
- static {
- RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(MEM_USED_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(COUCH_VIEWS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric()));
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
deleted file mode 100644
index 8bde2f3..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import java.net.URI;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.ImplementedBy;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.annotation.Effector;
-import org.apache.brooklyn.core.annotation.EffectorParam;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.config.render.RendererHints;
-import org.apache.brooklyn.core.effector.Effectors;
-import org.apache.brooklyn.core.effector.MethodEffector;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.util.core.flags.SetFromFlag;
-import org.apache.brooklyn.util.text.ByteSizeStrings;
-
-@Catalog(name="CouchBase Node", description="Couchbase Server is an open source, distributed (shared-nothing architecture) "
- + "NoSQL document-oriented database that is optimized for interactive applications.")
-@ImplementedBy(CouchbaseNodeImpl.class)
-public interface CouchbaseNode extends SoftwareProcess {
-
- @SetFromFlag("adminUsername")
- ConfigKey<String> COUCHBASE_ADMIN_USERNAME = ConfigKeys.newStringConfigKey("couchbase.adminUsername", "Username for the admin user on the node", "Administrator");
-
- @SetFromFlag("adminPassword")
- ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = ConfigKeys.newStringConfigKey("couchbase.adminPassword", "Password for the admin user on the node", "Password");
-
- @SetFromFlag("version")
- ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
- "3.0.0");
-
- @SetFromFlag("enterprise")
- ConfigKey<Boolean> USE_ENTERPRISE = ConfigKeys.newBooleanConfigKey("couchbase.enterprise.enabled",
- "Whether to use Couchbase Enterprise; if false uses the community version. Defaults to true.", true);
-
- @SetFromFlag("downloadUrl")
- BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
- SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/${version}/"
- + "couchbase-server-${driver.communityOrEnterprise}${driver.downloadLinkPreVersionSeparator}${version}${driver.downloadLinkOsTagWithPrefix}");
-
- @SetFromFlag("clusterInitRamSize")
- BasicAttributeSensorAndConfigKey<Integer> COUCHBASE_CLUSTER_INIT_RAM_SIZE = new BasicAttributeSensorAndConfigKey<Integer>(
- Integer.class, "couchbase.clusterInitRamSize", "initial ram size of the cluster", 300);
-
- PortAttributeSensorAndConfigKey COUCHBASE_WEB_ADMIN_PORT = new PortAttributeSensorAndConfigKey("couchbase.webAdminPort", "Web Administration Port", "8091+");
- PortAttributeSensorAndConfigKey COUCHBASE_API_PORT = new PortAttributeSensorAndConfigKey("couchbase.apiPort", "Couchbase API Port", "8092+");
- PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalBucketPort", "Internal Bucket Port", "11209");
- PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_EXTERNAL_BUCKET_PORT = new PortAttributeSensorAndConfigKey("couchbase.internalExternalBucketPort", "Internal/External Bucket Port", "11210");
- PortAttributeSensorAndConfigKey COUCHBASE_CLIENT_INTERFACE_PROXY = new PortAttributeSensorAndConfigKey("couchbase.clientInterfaceProxy", "Client interface (proxy)", "11211");
- PortAttributeSensorAndConfigKey COUCHBASE_INCOMING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.incomingSslProxy", "Incoming SSL Proxy", "11214");
- PortAttributeSensorAndConfigKey COUCHBASE_INTERNAL_OUTGOING_SSL_PROXY = new PortAttributeSensorAndConfigKey("couchbase.internalOutgoingSslProxy", "Internal Outgoing SSL Proxy", "11215");
- PortAttributeSensorAndConfigKey COUCHBASE_REST_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalRestHttpsForSsl", "Internal REST HTTPS for SSL", "18091");
- PortAttributeSensorAndConfigKey COUCHBASE_CAPI_HTTPS_FOR_SSL = new PortAttributeSensorAndConfigKey("couchbase.internalCapiHttpsForSsl", "Internal CAPI HTTPS for SSL", "18092");
- PortAttributeSensorAndConfigKey ERLANG_PORT_MAPPER = new PortAttributeSensorAndConfigKey("couchbase.erlangPortMapper", "Erlang Port Mapper Daemon Listener Port (epmd)", "4369");
- PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeStart", "Node data exchange Port Range Start", "21100+");
- PortAttributeSensorAndConfigKey NODE_DATA_EXCHANGE_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("couchbase.nodeDataExchangePortRangeEnd", "Node data exchange Port Range End", "21199+");
-
- AttributeSensor<Boolean> IS_PRIMARY_NODE = Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the current couchbase node is the primary node for the cluster");
- AttributeSensor<Boolean> IS_IN_CLUSTER = Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the current couchbase node has been added to a cluster, "
- + "including being the first / primary node");
- AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
-
- // Interesting stats
- AttributeSensor<Double> OPS = Sensors.newDoubleSensor("couchbase.stats.ops",
- "Retrieved from pools/nodes/<current node>/interestingStats/ops");
- AttributeSensor<Long> COUCH_DOCS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.data.size",
- "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_data_size");
- AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.docs.actual.disk.size",
- "Retrieved from pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size");
- AttributeSensor<Long> EP_BG_FETCHED = Sensors.newLongSensor("couchbase.stats.ep.bg.fetched",
- "Retrieved from pools/nodes/<current node>/interestingStats/ep_bg_fetched");
- AttributeSensor<Long> MEM_USED = Sensors.newLongSensor("couchbase.stats.mem.used",
- "Retrieved from pools/nodes/<current node>/interestingStats/mem_used");
- AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.actual.disk.size",
- "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size");
- AttributeSensor<Long> CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.curr.items",
- "Retrieved from pools/nodes/<current node>/interestingStats/curr_items");
- AttributeSensor<Long> VB_REPLICA_CURR_ITEMS = Sensors.newLongSensor("couchbase.stats.vb.replica.curr.items",
- "Retrieved from pools/nodes/<current node>/interestingStats/vb_replica_curr_items");
- AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE = Sensors.newLongSensor("couchbase.stats.couch.views.data.size",
- "Retrieved from pools/nodes/<current node>/interestingStats/couch_views_data_size");
- AttributeSensor<Long> GET_HITS = Sensors.newLongSensor("couchbase.stats.get.hits",
- "Retrieved from pools/nodes/<current node>/interestingStats/get_hits");
- AttributeSensor<Double> CMD_GET = Sensors.newDoubleSensor("couchbase.stats.cmd.get",
- "Retrieved from pools/nodes/<current node>/interestingStats/cmd_get");
- AttributeSensor<Long> CURR_ITEMS_TOT = Sensors.newLongSensor("couchbase.stats.curr.items.tot",
- "Retrieved from pools/nodes/<current node>/interestingStats/curr_items_tot");
- AttributeSensor<String> REBALANCE_STATUS = Sensors.newStringSensor("couchbase.rebalance.status",
- "Displays the current rebalance status from pools/nodes/rebalanceStatus");
-
- class MainUri {
- public static final AttributeSensor<URI> MAIN_URI = Attributes.MAIN_URI;
-
- static {
- // ROOT_URL does not need init because it refers to something already initialized
- RendererHints.register(COUCHBASE_WEB_ADMIN_URL, RendererHints.namedActionWithUrl());
-
- RendererHints.register(COUCH_DOCS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(MEM_USED, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(COUCH_VIEWS_ACTUAL_DISK_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
- RendererHints.register(COUCH_VIEWS_DATA_SIZE, RendererHints.displayValue(ByteSizeStrings.metric()));
- }
- }
-
- // this long-winded reference is done just to trigger the initialization above
- AttributeSensor<URI> MAIN_URI = MainUri.MAIN_URI;
-
- MethodEffector<Void> SERVER_ADD = new MethodEffector<Void>(CouchbaseNode.class, "serverAdd");
- MethodEffector<Void> SERVER_ADD_AND_REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "serverAddAndRebalance");
- MethodEffector<Void> REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "rebalance");
- MethodEffector<Void> BUCKET_CREATE = new MethodEffector<Void>(CouchbaseNode.class, "bucketCreate");
- org.apache.brooklyn.api.effector.Effector<Void> ADD_REPLICATION_RULE = Effectors.effector(Void.class, "addReplicationRule")
- .description("Adds a replication rule from the indicated bucket on the cluster where this node is located "
- + "to the indicated cluster and optional destination bucket")
- .parameter(String.class, "fromBucket", "Bucket to be replicated")
- .parameter(Object.class, "toCluster", "Entity (or ID) of the cluster to which this should replicate")
- .parameter(String.class, "toBucket", "Destination bucket for replication in the toCluster, defaulting to the same as the fromBucket")
- .buildAbstract();
-
- @Effector(description = "add a server to a cluster")
- public void serverAdd(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password);
-
- @Effector(description = "add a server to a cluster, and immediately rebalances")
- public void serverAddAndRebalance(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password);
-
- @Effector(description = "rebalance the couchbase cluster")
- public void rebalance();
-
- @Effector(description = "create a new bucket")
- public void bucketCreate(@EffectorParam(name = "bucketName") String bucketName, @EffectorParam(name = "bucketType") String bucketType,
- @EffectorParam(name = "bucketPort") Integer bucketPort, @EffectorParam(name = "bucketRamSize") Integer bucketRamSize,
- @EffectorParam(name = "bucketReplica") Integer bucketReplica);
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
deleted file mode 100644
index b5c797d..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessDriver;
-
-public interface CouchbaseNodeDriver extends SoftwareProcessDriver {
- public String getOsTag();
- public String getDownloadLinkPreVersionSeparator();
- public String getDownloadLinkOsTagWithPrefix();
-
- public String getCommunityOrEnterprise();
-
- public void serverAdd(String serverToAdd, String username, String password);
-
- public void rebalance();
-
- public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica);
-
- public void serverAddAndRebalance(String serverToAdd, String username, String password);
-
- public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket);
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
deleted file mode 100644
index 68d3a54..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import static java.lang.String.format;
-
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.location.MachineProvisioningLocation;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
-import org.apache.brooklyn.core.effector.EffectorBody;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
-import org.apache.brooklyn.feed.http.HttpFeed;
-import org.apache.brooklyn.feed.http.HttpPollConfig;
-import org.apache.brooklyn.feed.http.HttpValueFunctions;
-import org.apache.brooklyn.feed.http.JsonFunctions;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.http.HttpTool;
-import org.apache.brooklyn.util.http.HttpToolResponse;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.guava.MaybeFunctions;
-import org.apache.brooklyn.util.guava.TypeTokens;
-import org.apache.brooklyn.util.net.Urls;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.net.HostAndPort;
-import com.google.common.net.HttpHeaders;
-import com.google.common.net.MediaType;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-
-public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseNode {
-
- private static final Logger log = LoggerFactory.getLogger(CouchbaseNodeImpl.class);
-
- private volatile HttpFeed httpFeed;
-
- @Override
- public Class<CouchbaseNodeDriver> getDriverInterface() {
- return CouchbaseNodeDriver.class;
- }
-
- @Override
- public CouchbaseNodeDriver getDriver() {
- return (CouchbaseNodeDriver) super.getDriver();
- }
-
- @Override
- public void init() {
- super.init();
-
- subscriptions().subscribe(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
- @Override
- public void onEvent(SensorEvent<Boolean> booleanSensorEvent) {
- if (Boolean.TRUE.equals(booleanSensorEvent.getValue())) {
- Integer webPort = getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
- Preconditions.checkNotNull(webPort, CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT+" not set for %s; is an acceptable port available?", this);
- String hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(CouchbaseNodeImpl.this, webPort).toString();
- sensors().set(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL, URI.create(format("http://%s", hostAndPort)));
- }
- }
- });
-
- getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new EffectorBody<Void>() {
- @Override
- public Void call(ConfigBag parameters) {
- addReplicationRule(parameters);
- return null;
- }
- });
- }
-
- protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) {
- ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location));
- result.configure(CloudLocationConfig.OS_64_BIT, true);
- return result.getAllConfig();
- }
-
- @Override
- protected Collection<Integer> getRequiredOpenPorts() {
- // TODO this creates a huge list of inbound ports; much better to define on a security group using range syntax!
- int erlangRangeStart = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_START).iterator().next();
- int erlangRangeEnd = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_END).iterator().next();
-
- Set<Integer> newPorts = MutableSet.<Integer>copyOf(super.getRequiredOpenPorts());
- newPorts.remove(erlangRangeStart);
- newPorts.remove(erlangRangeEnd);
- for (int i = erlangRangeStart; i <= erlangRangeEnd; i++)
- newPorts.add(i);
- return newPorts;
- }
-
- @Override
- public void serverAdd(String serverToAdd, String username, String password) {
- getDriver().serverAdd(serverToAdd, username, password);
- }
-
- @Override
- public void serverAddAndRebalance(String serverToAdd, String username, String password) {
- getDriver().serverAddAndRebalance(serverToAdd, username, password);
- }
-
- @Override
- public void rebalance() {
- getDriver().rebalance();
- }
-
- protected final static Function<HttpToolResponse, JsonElement> GET_THIS_NODE_STATS = Functionals.chain(
- HttpValueFunctions.jsonContents(),
- JsonFunctions.walk("nodes"),
- new Function<JsonElement, JsonElement>() {
- @Override public JsonElement apply(JsonElement input) {
- JsonArray nodes = input.getAsJsonArray();
- for (JsonElement element : nodes) {
- JsonElement thisNode = element.getAsJsonObject().get("thisNode");
- if (thisNode!=null && Boolean.TRUE.equals(thisNode.getAsBoolean())) {
- return element.getAsJsonObject().get("interestingStats");
- }
- }
- return null;
- }}
- );
-
- protected final static <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) {
- return new HttpPollConfig<T>(sensor)
- .onSuccess(Functionals.chain(GET_THIS_NODE_STATS,
- MaybeFunctions.<JsonElement>wrap(),
- JsonFunctions.walkM(jsonPath),
- JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null)))
- .onFailureOrException(Functions.<T>constant(null));
- }
-
- @Override
- protected void postStart() {
- super.postStart();
- renameServerToPublicHostname();
- }
-
- protected void renameServerToPublicHostname() {
- // http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames
- URI apiUri = null;
- try {
- HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(COUCHBASE_WEB_ADMIN_PORT));
- apiUri = URI.create(String.format("http://%s:%d/node/controller/rename", accessible.getHostText(), accessible.getPort()));
- UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), getConfig(COUCHBASE_ADMIN_PASSWORD));
- HttpToolResponse response = HttpTool.httpPost(
- // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
- HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(),
- apiUri,
- MutableMap.of(
- HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString(),
- HttpHeaders.ACCEPT, "*/*",
- // this appears needed; without it we get org.apache.http.NoHttpResponseException !?
- HttpHeaders.AUTHORIZATION, HttpTool.toBasicAuthorizationValue(credentials)),
- Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array());
- log.debug("Renamed Couchbase server "+this+" via "+apiUri+": "+response);
- if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) {
- log.warn("Invalid response code, renaming {} ({}): {}",
- new Object[]{apiUri, response.getResponseCode(), response.getContentAsString()});
- }
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- log.warn("Error renaming server, using "+apiUri+": "+e, e);
- }
- }
-
- public void connectSensors() {
- super.connectSensors();
- connectServiceUpIsRunning();
-
- HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
- httpFeed = HttpFeed.builder()
- .entity(this)
- .period(Duration.seconds(3))
- .baseUri("http://" + hostAndPort + "/pools/nodes/")
- .credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
- .poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops"))
- .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, "couch_docs_data_size"))
- .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, "couch_docs_actual_disk_size"))
- .poll(getSensorFromNodeStat(CouchbaseNode.EP_BG_FETCHED, "ep_bg_fetched"))
- .poll(getSensorFromNodeStat(CouchbaseNode.MEM_USED, "mem_used"))
- .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, "couch_views_actual_disk_size"))
- .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS, "curr_items"))
- .poll(getSensorFromNodeStat(CouchbaseNode.VB_REPLICA_CURR_ITEMS, "vb_replica_curr_items"))
- .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, "couch_views_data_size"))
- .poll(getSensorFromNodeStat(CouchbaseNode.GET_HITS, "get_hits"))
- .poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get"))
- .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, "curr_items_tot"))
- .poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS)
- .onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class))
- .onFailureOrException(Functions.constant("Could not retrieve")))
- .build();
- }
-
- public void disconnectSensors() {
- super.disconnectSensors();
- disconnectServiceUpIsRunning();
- if (httpFeed != null) {
- httpFeed.stop();
- }
- }
-
- @Override
- public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) {
- if (Strings.isBlank(bucketType)) bucketType = "couchbase";
- if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200;
- if (bucketReplica==null || bucketReplica<0) bucketReplica = 1;
-
- getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
- }
-
- /** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */
- protected void addReplicationRule(ConfigBag ruleArgs) {
- Object toClusterO = Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must not be null");
- if (toClusterO instanceof String) {
- toClusterO = getManagementContext().lookup((String)toClusterO);
- }
- Entity toCluster = Tasks.resolving(toClusterO, Entity.class).context(getExecutionContext()).get();
-
- String fromBucket = Preconditions.checkNotNull( (String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" );
-
- String toBucket = (String)ruleArgs.getStringKey("toBucket");
- if (toBucket==null) toBucket = fromBucket;
-
- if (!ruleArgs.getUnusedConfig().isEmpty()) {
- throw new IllegalArgumentException("Unsupported replication rule data: "+ruleArgs.getUnusedConfig());
- }
-
- getDriver().addReplicationRule(toCluster, fromBucket, toBucket);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
deleted file mode 100644
index 915f06a..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import static java.lang.String.format;
-import static org.apache.brooklyn.util.ssh.BashCommands.*;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.location.OsDetails;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.drivers.downloads.BasicDownloadRequirement;
-import org.apache.brooklyn.core.entity.drivers.downloads.DownloadProducerFromUrlAttribute;
-import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
-import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
-import org.apache.brooklyn.feed.http.HttpValueFunctions;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.http.HttpTool;
-import org.apache.brooklyn.util.http.HttpToolResponse;
-import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.core.task.TaskBuilder;
-import org.apache.brooklyn.util.core.task.TaskTags;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.repeat.Repeater;
-import org.apache.brooklyn.util.ssh.BashCommands;
-import org.apache.brooklyn.util.text.NaturalOrderComparator;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.text.StringEscapes.BashStringEscapes;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.net.HostAndPort;
-
-public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver {
-
- public CouchbaseNodeSshDriver(final CouchbaseNodeImpl entity, final SshMachineLocation machine) {
- super(entity, machine);
- }
-
- public static String couchbaseCli(String cmd) {
- return "/opt/couchbase/bin/couchbase-cli " + cmd + " ";
- }
-
- @Override
- public void preInstall() {
- resolver = Entities.newDownloader(this);
- setExpandedInstallDir(getInstallDir());
- }
-
- @Override
- public void install() {
- //for reference https://github.com/urbandecoder/couchbase/blob/master/recipes/server.rb
- //installation instructions (http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#preparing-to-install)
-
- List<String> urls = resolver.getTargets();
- String saveAs = resolver.getFilename();
-
- OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
-
- if (osDetails.isLinux()) {
- List<String> commands = installLinux(urls, saveAs);
- //FIXME installation return error but the server is up and running.
- newScript(INSTALLING)
- .body.append(commands).execute();
- } else {
- Tasks.markInessential();
- throw new IllegalStateException("Unsupported OS for installing Couchbase. Will continue but may fail later.");
- }
- }
-
- private List<String> installLinux(List<String> urls, String saveAs) {
-
- log.info("Installing " + getEntity() + " using couchbase-server-{} {}", getCommunityOrEnterprise(), getVersion());
-
- String apt = chainGroup(
- installPackage(MutableMap.of("apt", "python-httplib2 libssl0.9.8"), null),
- sudo(format("dpkg -i %s", saveAs)));
-
- String yum = chainGroup(
- "which yum",
- // The following prevents failure on RHEL AWS nodes:
- // https://forums.aws.amazon.com/thread.jspa?threadID=100509
- ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ /etc/yum/pluginconf.d/subscription-manager.conf")),
- ok(sudo("yum check-update")),
- sudo("yum install -y pkgconfig"),
- // RHEL requires openssl version 098
- sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" /etc/redhat-release && sudo yum install -y openssl098e) || :"),
- sudo(format("rpm --install %s", saveAs)));
-
- String link = new DownloadProducerFromUrlAttribute().apply(new BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next();
- return ImmutableList.<String>builder()
- .add(INSTALL_CURL)
- .addAll(Arrays.asList(INSTALL_CURL,
- BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls, saveAs),
- // Referer link is required for 3.0.0; note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer
- "curl -f -L -k " + BashStringEscapes.wrapBash(link)
- + " -H 'Referer: http://www.couchbase.com/downloads'"
- + " -o " + saveAs),
- "Could not retrieve " + saveAs + " (from " + urls.size() + " sites)", 9)))
- .add(alternatives(apt, yum))
- .build();
- }
-
- @Override
- public void customize() {
- //TODO: add linux tweaks for couchbase
- //http://blog.couchbase.com/often-overlooked-linux-os-tweaks
- //http://blog.couchbase.com/kirk
-
- //turn off swappiness
- //vm.swappiness=0
- //sudo echo 0 > /proc/sys/vm/swappiness
-
- //os page cache = 20%
-
- //disable THP
- //sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled
- //sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag
-
- //turn off transparent huge pages
- //limit page cache disty bytes
- //control the rate page cache is flused ... vm.dirty_*
- }
-
- @Override
- public void launch() {
- String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : "");
- // in v30, the cluster arguments were changed, and it became mandatory to supply a url + password (if there is none, these are ignored)
- newScript(LAUNCHING)
- .body.append(
- sudo("/etc/init.d/couchbase-server start"),
- "for i in {0..120}\n" +
- "do\n" +
- " if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" +
- " curl -s " + String.format("http://localhost:%s", getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" +
- " sleep 1\n" +
- "done\n" +
- couchbaseCli("cluster-init") +
- (isPreV3() ? getCouchbaseHostnameAndPort() : getCouchbaseHostnameAndCredentials()) +
- " " + clusterPrefix + "username=" + getUsername() +
- " " + clusterPrefix + "password=" + getPassword() +
- " " + clusterPrefix + "port=" + getWebPort() +
- " " + clusterPrefix + "ramsize=" + getClusterInitRamSize())
- .execute();
- }
-
- @Override
- public boolean isRunning() {
- //TODO add a better way to check if couchbase server is running
- return (newScript(CHECK_RUNNING)
- .body.append(format("curl -u %s:%s http://localhost:%s/pools/nodes", getUsername(), getPassword(), getWebPort()))
- .execute() == 0);
- }
-
- @Override
- public void stop() {
- newScript(STOPPING)
- .body.append(sudo("/etc/init.d/couchbase-server stop"))
- .execute();
- }
-
- @Override
- public String getVersion() {
- return entity.getConfig(CouchbaseNode.SUGGESTED_VERSION);
- }
-
- @Override
- public String getOsTag() {
- return newDownloadLinkSegmentComputer().getOsTag();
- }
-
- protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() {
- return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), !isPreV3(), Strings.toString(getEntity()));
- }
-
- public static class DownloadLinkSegmentComputer {
- // links are:
- // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm
- // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb
- // ^^^ preV3 is _ everywhere
- // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community_3.0.0-ubuntu12.04_amd64.deb
- // ^^^ most V3 is _${version}-
- // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm
- // ^^^ but RHEL is -${version}-
-
- @Nullable
- private final OsDetails os;
- @Nonnull
- private final boolean isV3OrLater;
- @Nonnull
- private final String context;
- @Nonnull
- private final String osName;
- @Nonnull
- private final boolean isRpm;
- @Nonnull
- private final boolean is64bit;
-
- public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean isV3OrLater, @Nonnull String context) {
- this.os = os;
- this.isV3OrLater = isV3OrLater;
- this.context = context;
- if (os == null) {
- // guess centos as RPM is sensible default
- log.warn("No details known for OS of " + context + "; assuming 64-bit RPM distribution of Couchbase");
- osName = "centos";
- isRpm = true;
- is64bit = true;
- return;
- }
- osName = os.getName().toLowerCase();
- isRpm = !(osName.contains("deb") || osName.contains("ubuntu"));
- is64bit = os.is64bit();
- }
-
- /**
- * separator after the version number used to be _ but is - in 3.0 and later
- */
- public String getPreVersionSeparator() {
- if (!isV3OrLater) return "_";
- if (isRpm) return "-";
- return "_";
- }
-
- public String getOsTag() {
- // couchbase only provide certain versions; if on other platforms let's suck-it-and-see
- String family;
- if (osName.contains("debian")) family = "debian7_";
- else if (osName.contains("ubuntu")) family = "ubuntu12.04_";
- else if (osName.contains("centos") || osName.contains("rhel") || (osName.contains("red") && osName.contains("hat")))
- family = "centos6.";
- else {
- log.warn("Unrecognised OS " + os + " of " + context + "; assuming RPM distribution of Couchbase");
- family = "centos6.";
- }
-
- if (!is64bit && !isV3OrLater) {
- // NB: 32-bit binaries aren't (yet?) available for v30
- log.warn("32-bit binaries for Couchbase might not be available, when deploying " + context);
- }
- String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : "x86_64";
- String fileExtension = isRpm ? ".rpm" : ".deb";
-
- if (isV3OrLater)
- return family + arch + fileExtension;
- else
- return arch + fileExtension;
- }
-
- public String getOsTagWithPrefix() {
- return (!isV3OrLater ? "_" : "-") + getOsTag();
- }
- }
-
- @Override
- public String getDownloadLinkOsTagWithPrefix() {
- return newDownloadLinkSegmentComputer().getOsTagWithPrefix();
- }
-
- @Override
- public String getDownloadLinkPreVersionSeparator() {
- return newDownloadLinkSegmentComputer().getPreVersionSeparator();
- }
-
- private boolean isPreV3() {
- return NaturalOrderComparator.INSTANCE.compare(getEntity().getConfig(CouchbaseNode.SUGGESTED_VERSION), "3.0") < 0;
- }
-
- @Override
- public String getCommunityOrEnterprise() {
- Boolean isEnterprise = getEntity().getConfig(CouchbaseNode.USE_ENTERPRISE);
- return isEnterprise ? "enterprise" : "community";
- }
-
- private String getUsername() {
- return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
- }
-
- private String getPassword() {
- return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
- }
-
- private String getWebPort() {
- return "" + entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
- }
-
- private String getCouchbaseHostnameAndCredentials() {
- return format("-c %s:%s -u %s -p %s", getSubnetHostname(), getWebPort(), getUsername(), getPassword());
- }
-
- private String getCouchbaseHostnameAndPort() {
- return format("-c %s:%s", getSubnetHostname(), getWebPort());
- }
-
- private String getClusterInitRamSize() {
- return entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString();
- }
-
- @Override
- public void rebalance() {
- entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "explicitly started");
- newScript("rebalance")
- .body.append(
- couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials())
- .failOnNonZeroResultCode()
- .execute();
-
- // wait until the re-balance is started
- // (if it's quick, this might miss it, but it will only block for 30s if so)
- Repeater.create()
- .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, Duration.millis(500))
- .limitTimeTo(Duration.THIRTY_SECONDS)
- .until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
- if (isNodeRebalancing(nodeHostAndPort.toString())) {
- return true;
- }
- }
- return false;
- }
- }
- ).run();
-
- entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "waiting for completion");
- // Wait until the Couchbase node finishes the re-balancing
- Task<Boolean> reBalance = TaskBuilder.<Boolean>builder()
- .displayName("Waiting until node is rebalancing")
- .body(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return Repeater.create()
- .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
- .limitTimeTo(Duration.FIVE_MINUTES)
- .until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
- if (isNodeRebalancing(nodeHostAndPort.toString())) {
- return false;
- }
- }
- return true;
- }
- })
- .run();
- }
- })
- .build();
- Boolean completed = DynamicTasks.queueIfPossible(reBalance)
- .orSubmitAndBlock()
- .andWaitForSuccess();
- if (completed) {
- entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "completed");
- ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing");
- log.info("Rebalanced cluster via primary node {}", getEntity());
- } else {
- entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "timed out");
- ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), "rebalancing", "rebalance did not complete within time limit");
- log.warn("Timeout rebalancing cluster via primary node {}", getEntity());
- }
- }
-
- private Iterable<HostAndPort> getNodesHostAndPort() {
- Group group = Iterables.getFirst(getEntity().groups(), null);
- if (group == null) return Lists.newArrayList();
- return Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES),
- new Function<Entity, HostAndPort>() {
- @Override
- public HostAndPort apply(Entity input) {
- return BrooklynAccessUtils.getBrooklynAccessibleAddress(input, input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
- }
- });
- }
-
- private boolean isNodeRebalancing(String nodeHostAndPort) {
- HttpToolResponse response = getApiResponse("http://" + nodeHostAndPort + "/pools/default/rebalanceProgress");
- if (response.getResponseCode() != 200) {
- throw new IllegalStateException("failed retrieving rebalance status: " + response);
- }
- return !"none".equals(HttpValueFunctions.jsonContents("status", String.class).apply(response));
- }
-
- private HttpToolResponse getApiResponse(String uri) {
- return HttpTool.httpGet(HttpTool.httpClientBuilder()
- // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
- .uri(uri)
- .credentials(new UsernamePasswordCredentials(getUsername(), getPassword()))
- .build(),
- URI.create(uri),
- ImmutableMap.<String, String>of());
- }
-
- @Override
- public void serverAdd(String serverToAdd, String username, String password) {
- newScript("serverAdd").body.append(couchbaseCli("server-add")
- + getCouchbaseHostnameAndCredentials() +
- " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
- " --server-add-username=" + BashStringEscapes.wrapBash(username) +
- " --server-add-password=" + BashStringEscapes.wrapBash(password))
- .failOnNonZeroResultCode()
- .execute();
- }
-
- @Override
- public void serverAddAndRebalance(String serverToAdd, String username, String password) {
- newScript("serverAddAndRebalance").body.append(couchbaseCli("rebalance")
- + getCouchbaseHostnameAndCredentials() +
- " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
- " --server-add-username=" + BashStringEscapes.wrapBash(username) +
- " --server-add-password=" + BashStringEscapes.wrapBash(password))
- .failOnNonZeroResultCode()
- .execute();
- entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "triggered as part of server-add");
- }
-
- @Override
- public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) {
- log.info("Adding bucket: {} to cluster {} primary node: {}", new Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), getEntity()});
-
- newScript("bucketCreate").body.append(couchbaseCli("bucket-create")
- + getCouchbaseHostnameAndCredentials() +
- " --bucket=" + BashStringEscapes.wrapBash(bucketName) +
- " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) +
- " --bucket-port=" + bucketPort +
- " --bucket-ramsize=" + bucketRamSize +
- " --bucket-replica=" + bucketReplica)
- .failOnNonZeroResultCode()
- .execute();
- }
-
- @Override
- public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket) {
- DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, Attributes.SERVICE_UP)).getUnchecked();
-
- String destName = CouchbaseClusterImpl.getClusterName(toCluster);
-
- log.info("Setting up XDCR for " + fromBucket + " from " + CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") "
- + "to " + destName + " (" + toCluster + ")");
-
- Entity destPrimaryNode = toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE);
- String destHostname = destPrimaryNode.getAttribute(Attributes.HOSTNAME);
- String destUsername = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
- String destPassword = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
-
- // on the REST API there is mention of a 'type' 'continuous' but i don't see other refs to this
-
- // PROTOCOL Select REST protocol or memcached for replication. xmem indicates memcached while capi indicates REST protocol.
- // looks like xmem is the default; leave off for now
-// String replMode = "xmem";
-
- DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh(
- couchbaseCli("xdcr-setup") +
- getCouchbaseHostnameAndCredentials() +
- " --create" +
- " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) +
- " --xdcr-hostname=" + BashStringEscapes.wrapBash(destHostname) +
- " --xdcr-username=" + BashStringEscapes.wrapBash(destUsername) +
- " --xdcr-password=" + BashStringEscapes.wrapBash(destPassword)
- ).summary("create xdcr destination " + destName).newTask()));
-
- // would be nice to auto-create bucket, but we'll need to know the parameters; the port in particular is tedious
-// ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", null, 0, 0);
-
- DynamicTasks.queue(SshEffectorTasks.ssh(
- couchbaseCli("xdcr-replicate") +
- getCouchbaseHostnameAndCredentials() +
- " --create" +
- " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) +
- " --xdcr-from-bucket=" + BashStringEscapes.wrapBash(fromBucket) +
- " --xdcr-to-bucket=" + BashStringEscapes.wrapBash(toBucket)
-// + " --xdcr-replication-mode="+replMode
- ).summary("configure replication for " + fromBucket + " to " + destName + ":" + toBucket).newTask());
- }
-}