You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by gr...@apache.org on 2014/11/12 14:49:08 UTC
[1/2] incubator-brooklyn git commit: Couchbase changes for Clocker
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master 27df750fa -> 59208f56a
Couchbase changes for Clocker
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/1784c21d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/1784c21d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/1784c21d
Branch: refs/heads/master
Commit: 1784c21d68381a19fdae793cc53e71f9662675e4
Parents: 79e1c94
Author: Andrea Turli <an...@gmail.com>
Authored: Tue Nov 4 21:12:45 2014 +0000
Committer: Andrea Turli <an...@gmail.com>
Committed: Wed Nov 12 14:33:59 2014 +0100
----------------------------------------------------------------------
.../nosql/couchbase/CouchbaseClusterImpl.java | 18 +-
.../nosql/couchbase/CouchbaseNodeImpl.java | 83 ++--
.../nosql/couchbase/CouchbaseNodeSshDriver.java | 378 ++++++++-----------
.../nosql/src/main/resources/couchbase-logo.png | Bin 0 -> 88089 bytes
4 files changed, 216 insertions(+), 263 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1784c21d/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
index 7c2a496..7552928 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
@@ -39,6 +39,7 @@ import brooklyn.entity.basic.Attributes;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.EntityInternal;
import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.SoftwareProcess;
import brooklyn.entity.effector.Effectors;
import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
import brooklyn.entity.group.DynamicClusterImpl;
@@ -284,8 +285,8 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas
@Override public List<String> apply(Set<Entity> input) {
List<String> addresses = Lists.newArrayList();
for (Entity entity : input) {
- addresses.add(String.format("%s:%s", entity.getAttribute(Attributes.ADDRESS),
- entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)));
+ addresses.add(String.format("%s",
+ BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT))));
}
return addresses;
}
@@ -447,7 +448,8 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas
return;
}
if (!isMemberInCluster(serverToAdd)) {
- HostAndPort webAdmin = BrooklynAccessUtils.getBrooklynAccessibleAddress(serverToAdd, serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
+ HostAndPort webAdmin = HostAndPort.fromParts(serverToAdd.getAttribute(SoftwareProcess.SUBNET_HOSTNAME),
+ serverToAdd.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
String username = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
String password = serverToAdd.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
@@ -512,15 +514,17 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas
CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
}
setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
-
+ HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
+
CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder()
.entity(CouchbaseClusterImpl.this)
.period(500, TimeUnit.MILLISECONDS)
- .baseUri(String.format("%s/pools/default/buckets/%s", primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), bucketName))
+ .baseUri(String.format("http://%s/pools/default/buckets/%s", hostAndPort, bucketName))
.credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
.poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
.onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
- @Override public Boolean apply(JsonElement input) {
+ @Override
+ public Boolean apply(JsonElement input) {
// Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable)
JsonArray servers = input.getAsJsonArray();
if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) {
@@ -544,7 +548,7 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas
}
}
if (input instanceof Throwable)
- Exceptions.propagate((Throwable)input);
+ Exceptions.propagate((Throwable) input);
throw new IllegalStateException("Unexpected response when creating bucket:" + input);
}
}))
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1784c21d/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
index a459637..e4b1c0a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
@@ -21,6 +21,7 @@ package brooklyn.entity.nosql.couchbase;
import static java.lang.String.format;
import java.net.URI;
+import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -59,18 +60,21 @@ import brooklyn.util.task.Tasks;
import brooklyn.util.text.Strings;
import brooklyn.util.time.Duration;
+import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+import com.google.common.net.HttpHeaders;
import com.google.common.net.MediaType;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseNode {
-
+
private static final Logger log = LoggerFactory.getLogger(CouchbaseNodeImpl.class);
-
- HttpFeed httpFeed;
+
+ private volatile HttpFeed httpFeed;
@Override
public Class<CouchbaseNodeDriver> getDriverInterface() {
@@ -85,7 +89,7 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
@Override
public void init() {
super.init();
-
+
subscribe(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
@Override
public void onEvent(SensorEvent<Boolean> booleanSensorEvent) {
@@ -97,7 +101,7 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
}
}
});
-
+
getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new EffectorBody<Void>() {
@Override
public Void call(ConfigBag parameters) {
@@ -106,7 +110,7 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
}
});
}
-
+
protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) {
ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location));
result.configure(CloudLocationConfig.OS_64_BIT, true);
@@ -143,8 +147,8 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
}
protected final static Function<HttpToolResponse, JsonElement> GET_THIS_NODE_STATS = Functionals.chain(
- HttpValueFunctions.jsonContents(),
- JsonFunctions.walk("nodes"),
+ HttpValueFunctions.jsonContents(),
+ JsonFunctions.walk("nodes"),
new Function<JsonElement, JsonElement>() {
@Override public JsonElement apply(JsonElement input) {
JsonArray nodes = input.getAsJsonArray();
@@ -157,12 +161,12 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
return null;
}}
);
-
+
protected final static <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) {
return new HttpPollConfig<T>(sensor)
- .onSuccess(Functionals.chain(GET_THIS_NODE_STATS,
- MaybeFunctions.<JsonElement>wrap(),
- JsonFunctions.walkM(jsonPath),
+ .onSuccess(Functionals.chain(GET_THIS_NODE_STATS,
+ MaybeFunctions.<JsonElement>wrap(),
+ JsonFunctions.walkM(jsonPath),
JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null)))
.onFailureOrException(Functions.<T>constant(null));
}
@@ -172,28 +176,24 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
super.postStart();
renameServerToPublicHostname();
}
-
+
protected void renameServerToPublicHostname() {
// http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames
URI apiUri = null;
try {
- String hostname = getAttribute(Attributes.HOSTNAME);
- String port = ""+getAttribute(COUCHBASE_WEB_ADMIN_PORT);
- apiUri = new URI("http://"+hostname+":"+port+"/node/controller/rename");
- UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(
- getConfig(COUCHBASE_ADMIN_USERNAME), getConfig(COUCHBASE_ADMIN_PASSWORD));
- HttpToolResponse response = HttpTool.httpPost(HttpTool.httpClientBuilder()
- // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
- .uri(apiUri)
- .credentials(credentials)
- .build(),
- apiUri,
- MutableMap.of(
- com.google.common.net.HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString(),
- com.google.common.net.HttpHeaders.ACCEPT, "*/*",
- // this appears needed; without it we get org.apache.http.NoHttpResponseException !?
- com.google.common.net.HttpHeaders.AUTHORIZATION, HttpTool.toBasicAuthorizationValue(credentials)),
- ("hostname="+Urls.encode(hostname)).getBytes());
+ HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(COUCHBASE_WEB_ADMIN_PORT));
+ apiUri = URI.create(String.format("http://%s:%d/node/controller/rename", accessible.getHostText(), accessible.getPort()));
+ UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), getConfig(COUCHBASE_ADMIN_PASSWORD));
+ HttpToolResponse response = HttpTool.httpPost(
+ // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
+ HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(),
+ apiUri,
+ MutableMap.of(
+ HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString(),
+ HttpHeaders.ACCEPT, "*/*",
+ // this appears needed; without it we get org.apache.http.NoHttpResponseException !?
+ HttpHeaders.AUTHORIZATION, HttpTool.toBasicAuthorizationValue(credentials)),
+ Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array());
log.debug("Renamed Couchbase server "+this+" via "+apiUri+": "+response);
if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) {
log.warn("Invalid response code, renaming "+apiUri+": "+response);
@@ -207,13 +207,12 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
public void connectSensors() {
super.connectSensors();
connectServiceUpIsRunning();
-
- URI adminUrl = DynamicTasks.submit(DependentConfiguration.attributeWhenReady(this, CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), this).getUnchecked(Duration.TWO_MINUTES);
-
+
+ HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
httpFeed = HttpFeed.builder()
.entity(this)
.period(Duration.seconds(3))
- .baseUri(adminUrl + "/pools/nodes/")
+ .baseUri("http://" + hostAndPort + "/pools/nodes/")
.credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
.poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops"))
.poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, "couch_docs_data_size"))
@@ -228,8 +227,8 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
.poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get"))
.poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, "curr_items_tot"))
.poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS)
- .onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class))
- .onFailureOrException(Functions.constant("Could not retrieve")))
+ .onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class))
+ .onFailureOrException(Functions.constant("Could not retrieve")))
.build();
}
@@ -246,10 +245,10 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
if (Strings.isBlank(bucketType)) bucketType = "couchbase";
if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200;
if (bucketReplica==null || bucketReplica<0) bucketReplica = 1;
-
+
getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
}
-
+
/** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */
protected void addReplicationRule(ConfigBag ruleArgs) {
Object toClusterO = Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must not be null");
@@ -257,16 +256,16 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN
toClusterO = getManagementContext().lookup((String)toClusterO);
}
Entity toCluster = Tasks.resolving(toClusterO, Entity.class).context(getExecutionContext()).get();
-
+
String fromBucket = Preconditions.checkNotNull( (String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" );
-
+
String toBucket = (String)ruleArgs.getStringKey("toBucket");
if (toBucket==null) toBucket = fromBucket;
-
+
if (!ruleArgs.getUnusedConfig().isEmpty()) {
throw new IllegalArgumentException("Unsupported replication rule data: "+ruleArgs.getUnusedConfig());
}
-
+
getDriver().addReplicationRule(toCluster, fromBucket, toBucket);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1784c21d/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
index 5c5f4c1..129bf3d 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java
@@ -18,29 +18,21 @@
*/
package brooklyn.entity.nosql.couchbase;
-import static brooklyn.util.ssh.BashCommands.INSTALL_CURL;
-import static brooklyn.util.ssh.BashCommands.alternatives;
-import static brooklyn.util.ssh.BashCommands.chainGroup;
-import static brooklyn.util.ssh.BashCommands.ok;
-import static brooklyn.util.ssh.BashCommands.sudo;
+import static brooklyn.util.ssh.BashCommands.*;
import static java.lang.String.format;
import java.net.URI;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.apache.http.HttpHeaders;
import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.HttpClient;
-import org.apache.http.entity.ContentType;
import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
import brooklyn.entity.basic.Attributes;
import brooklyn.entity.basic.Entities;
@@ -50,29 +42,30 @@ import brooklyn.entity.drivers.downloads.DownloadProducerFromUrlAttribute;
import brooklyn.entity.software.SshEffectorTasks;
import brooklyn.event.basic.DependentConfiguration;
import brooklyn.event.feed.http.HttpValueFunctions;
-import brooklyn.event.feed.http.JsonFunctions;
import brooklyn.location.OsDetails;
+import brooklyn.location.access.BrooklynAccessUtils;
import brooklyn.location.basic.SshMachineLocation;
-import brooklyn.util.guava.Functionals;
+import brooklyn.management.Task;
+import brooklyn.util.collections.MutableMap;
import brooklyn.util.http.HttpTool;
import brooklyn.util.http.HttpToolResponse;
-import brooklyn.util.net.Urls;
import brooklyn.util.repeat.Repeater;
import brooklyn.util.ssh.BashCommands;
import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.TaskBuilder;
import brooklyn.util.task.TaskTags;
import brooklyn.util.task.Tasks;
import brooklyn.util.text.NaturalOrderComparator;
import brooklyn.util.text.StringEscapes.BashStringEscapes;
+import brooklyn.util.text.Strings;
import brooklyn.util.time.Duration;
import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
+import com.google.common.net.HostAndPort;
public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver {
@@ -113,15 +106,10 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
private List<String> installLinux(List<String> urls, String saveAs) {
- log.info("Installing "+getEntity()+" using couchbase-server-{} {}", getCommunityOrEnterprise(), getVersion());
+ log.info("Installing " + getEntity() + " using couchbase-server-{} {}", getCommunityOrEnterprise(), getVersion());
String apt = chainGroup(
- "export DEBIAN_FRONTEND=noninteractive",
- "which apt-get",
- sudo("apt-get update"),
- // The following line is required to run on Docker container
- sudo("apt-get install -y python-httplib2"),
- sudo("apt-get install -y libssl0.9.8"),
+ installPackage(MutableMap.of("apt", "python-httplib2 libssl0.9.8"), null),
sudo(format("dpkg -i %s", saveAs)));
String yum = chainGroup(
@@ -138,13 +126,13 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
String link = new DownloadProducerFromUrlAttribute().apply(new BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next();
return ImmutableList.<String>builder()
.add(INSTALL_CURL)
- .addAll(Arrays.asList(INSTALL_CURL,
- BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls, saveAs),
- // Referer link is required for 3.0.0; note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer
- "curl -f -L -k "+BashStringEscapes.wrapBash(link)
- + " -H 'Referer: http://www.couchbase.com/downloads'"
- + " -o "+saveAs),
- "Could not retrieve "+saveAs+" (from "+urls.size()+" sites)", 9)))
+ .addAll(Arrays.asList(INSTALL_CURL,
+ BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls, saveAs),
+ // Referer link is required for 3.0.0; note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer
+ "curl -f -L -k " + BashStringEscapes.wrapBash(link)
+ + " -H 'Referer: http://www.couchbase.com/downloads'"
+ + " -o " + saveAs),
+ "Could not retrieve " + saveAs + " (from " + urls.size() + " sites)", 9)))
.add(alternatives(apt, yum))
.build();
}
@@ -160,11 +148,11 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
//sudo echo 0 > /proc/sys/vm/swappiness
//os page cache = 20%
-
+
//disable THP
//sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled
//sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag
-
+
//turn off transparent huge pages
//limit page cache disty bytes
//control the rate page cache is flused ... vm.dirty_*
@@ -172,23 +160,23 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
@Override
public void launch() {
- String clusterPrefix = "--cluster-"+(isPreV3() ? "init-" : "");
+ String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : "");
// in v30, the cluster arguments were changed, and it became mandatory to supply a url + password (if there is none, these are ignored)
newScript(LAUNCHING)
.body.append(
sudo("/etc/init.d/couchbase-server start"),
"for i in {0..120}\n" +
- "do\n" +
- " if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" +
- " curl -s " + String.format("http://localhost:%s", getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" +
- " sleep 1\n" +
- "done\n" +
- couchbaseCli("cluster-init") +
+ "do\n" +
+ " if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" +
+ " curl -s " + String.format("http://localhost:%s", getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" +
+ " sleep 1\n" +
+ "done\n" +
+ couchbaseCli("cluster-init") +
(isPreV3() ? getCouchbaseHostnameAndPort() : getCouchbaseHostnameAndCredentials()) +
- " "+clusterPrefix+"username=" + getUsername() +
- " "+clusterPrefix+"password=" + getPassword() +
- " "+clusterPrefix+"port=" + getWebPort() +
- " "+clusterPrefix+"ramsize=" + getClusterInitRamSize())
+ " " + clusterPrefix + "username=" + getUsername() +
+ " " + clusterPrefix + "password=" + getPassword() +
+ " " + clusterPrefix + "port=" + getWebPort() +
+ " " + clusterPrefix + "ramsize=" + getClusterInitRamSize())
.execute();
}
@@ -214,17 +202,15 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
@Override
public String getOsTag() {
- return newDownloadLinkSegmentComputer().getOsTag();
+ return newDownloadLinkSegmentComputer().getOsTag();
}
-
+
protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() {
- return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), !isPreV3(), ""+getEntity());
+ return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), !isPreV3(), Strings.toString(getEntity()));
}
-
- public static class DownloadLinkSegmentComputer {
+ public static class DownloadLinkSegmentComputer {
// links are:
-
// http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm
// http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb
// ^^^ preV3 is _ everywhere
@@ -232,21 +218,27 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
// ^^^ most V3 is _${version}-
// http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm
// ^^^ but RHEL is -${version}-
-
- @Nullable private final OsDetails os;
- @Nonnull private final boolean isV3OrLater;
- @Nonnull private final String context;
- @Nonnull private final String osName;
- @Nonnull private final boolean isRpm;
- @Nonnull private final boolean is64bit;
-
+
+ @Nullable
+ private final OsDetails os;
+ @Nonnull
+ private final boolean isV3OrLater;
+ @Nonnull
+ private final String context;
+ @Nonnull
+ private final String osName;
+ @Nonnull
+ private final boolean isRpm;
+ @Nonnull
+ private final boolean is64bit;
+
public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean isV3OrLater, @Nonnull String context) {
this.os = os;
this.isV3OrLater = isV3OrLater;
this.context = context;
- if (os==null) {
+ if (os == null) {
// guess centos as RPM is sensible default
- log.warn("No details known for OS of "+context+"; assuming 64-bit RPM distribution of Couchbase");
+ log.warn("No details known for OS of " + context + "; assuming 64-bit RPM distribution of Couchbase");
osName = "centos";
isRpm = true;
is64bit = true;
@@ -256,45 +248,51 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
isRpm = !(osName.contains("deb") || osName.contains("ubuntu"));
is64bit = os.is64bit();
}
- /** separator after the version number used to be _ but is - in 3.0 and later */
+
+ /**
+ * separator after the version number used to be _ but is - in 3.0 and later
+ */
public String getPreVersionSeparator() {
if (!isV3OrLater) return "_";
if (isRpm) return "-";
return "_";
}
+
public String getOsTag() {
// couchbase only provide certain versions; if on other platforms let's suck-it-and-see
String family;
if (osName.contains("debian")) family = "debian7_";
else if (osName.contains("ubuntu")) family = "ubuntu12.04_";
- else if (osName.contains("centos") || osName.contains("rhel") || (osName.contains("red") && osName.contains("hat")))
+ else if (osName.contains("centos") || osName.contains("rhel") || (osName.contains("red") && osName.contains("hat")))
family = "centos6.";
else {
- log.warn("Unrecognised OS "+os+" of "+context+"; assuming RPM distribution of Couchbase");
+ log.warn("Unrecognised OS " + os + " of " + context + "; assuming RPM distribution of Couchbase");
family = "centos6.";
}
if (!is64bit && !isV3OrLater) {
// NB: 32-bit binaries aren't (yet?) available for v30
- log.warn("32-bit binaries for Couchbase might not be available, when deploying "+context);
+ log.warn("32-bit binaries for Couchbase might not be available, when deploying " + context);
}
String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : "x86_64";
String fileExtension = isRpm ? ".rpm" : ".deb";
-
+
if (isV3OrLater)
return family + arch + fileExtension;
else
return arch + fileExtension;
}
+
public String getOsTagWithPrefix() {
- return (!isV3OrLater ? "_" : "-") + getOsTag();
+ return (!isV3OrLater ? "_" : "-") + getOsTag();
}
}
-
+
@Override
public String getDownloadLinkOsTagWithPrefix() {
- return newDownloadLinkSegmentComputer().getOsTagWithPrefix();
+ return newDownloadLinkSegmentComputer().getOsTagWithPrefix();
}
+
@Override
public String getDownloadLinkPreVersionSeparator() {
return newDownloadLinkSegmentComputer().getPreVersionSeparator();
@@ -319,68 +317,76 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
}
private String getWebPort() {
- return ""+entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
+ return "" + entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT);
}
private String getCouchbaseHostnameAndCredentials() {
- return format("-c localhost:%s -u %s -p %s", getWebPort(), getUsername(), getPassword());
+ return format("-c %s:%s -u %s -p %s", getSubnetHostname(), getWebPort(), getUsername(), getPassword());
}
private String getCouchbaseHostnameAndPort() {
- return format("-c localhost:%s", getWebPort());
+ return format("-c %s:%s", getSubnetHostname(), getWebPort());
}
private String getClusterInitRamSize() {
return entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString();
}
-
+
@Override
public void rebalance() {
entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "explicitly started");
newScript("rebalance")
.body.append(
- couchbaseCli("rebalance") +
- getCouchbaseHostnameAndCredentials())
+ couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials())
.failOnNonZeroResultCode()
.execute();
-
+
// wait until the re-balance is started
// (if it's quick, this might miss it, but it will only block for 30s if so)
Repeater.create()
- .backoff(Duration.millis(10), 2, Duration.millis(500))
- .limitTimeTo(Duration.THIRTY_SECONDS)
- .until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- for (String nodeHostAndPort : CouchbaseNodeSshDriver.this.getNodesHostAndPort()) {
- if (isNodeRebalancing(nodeHostAndPort)) {
- return true;
- }
- }
- return false;
- }
- })
- .run();
-
+ .backoff(Duration.millis(10), 2, Duration.millis(500))
+ .limitTimeTo(Duration.THIRTY_SECONDS)
+ .until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
+ if (isNodeRebalancing(nodeHostAndPort.toString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+ ).run();
+
entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "waiting for completion");
- // NB: a sensor feed will also update this
-
- // then wait until the re-balance is complete
- boolean completed = Repeater.create()
- .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
- .limitTimeTo(Duration.FIVE_MINUTES)
- .until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- for (String nodeHostAndPort : getNodesHostAndPort()) {
- if (isNodeRebalancing(nodeHostAndPort)) {
- return false;
+ // Wait until the Couchbase node finishes the re-balancing
+ Task<Boolean> reBalance = TaskBuilder.<Boolean>builder()
+ .name("Waiting until node is rebalancing")
+ .body(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return Repeater.create()
+ .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
+ .limitTimeTo(Duration.FIVE_MINUTES)
+ .until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
+ if (isNodeRebalancing(nodeHostAndPort.toString())) {
+ return false;
+ }
+ }
+ return true;
+ }
+ })
+ .run();
}
- }
- return true;
- }
- })
- .run();
+ })
+ .build();
+ Boolean completed = DynamicTasks.queueIfPossible(reBalance)
+ .orSubmitAndBlock()
+ .andWaitForSuccess();
if (completed) {
entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "completed");
ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing");
@@ -392,29 +398,18 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
}
}
- private Iterable<String> getNodesHostAndPort() {
- Function<JsonElement, Iterable<String>> getNodesAsList = new Function<JsonElement, Iterable<String>>() {
- @Override public Iterable<String> apply(JsonElement input) {
- if (input == null) {
- return Collections.emptyList();
- }
- Collection<String> names = Lists.newArrayList();
- JsonArray nodes = input.getAsJsonArray();
- for (JsonElement element : nodes) {
- // NOTE: the 'hostname' element also includes the port
- names.add(element.getAsJsonObject().get("hostname").toString().replace("\"", ""));
- }
- return names;
- }
- };
- HttpToolResponse nodesResponse = getApiResponse(String.format("http://%s:%s/pools/nodes", getHostname(), getWebPort()));
- return Functionals.chain(
- HttpValueFunctions.jsonContents(),
- JsonFunctions.walkN("nodes"),
- getNodesAsList
- ).apply(nodesResponse);
+ private Iterable<HostAndPort> getNodesHostAndPort() {
+ Group group = Iterables.getFirst(getEntity().getGroups(), null);
+ if (group == null) return Lists.newArrayList();
+ return Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES),
+ new Function<Entity, HostAndPort>() {
+ @Override
+ public HostAndPort apply(Entity input) {
+ return BrooklynAccessUtils.getBrooklynAccessibleAddress(input, input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
+ }
+ });
}
-
+
private boolean isNodeRebalancing(String nodeHostAndPort) {
HttpToolResponse response = getApiResponse("http://" + nodeHostAndPort + "/pools/default/rebalanceProgress");
if (response.getResponseCode() != 200) {
@@ -422,64 +417,19 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
}
return !"none".equals(HttpValueFunctions.jsonContents("status", String.class).apply(response));
}
-
+
private HttpToolResponse getApiResponse(String uri) {
return HttpTool.httpGet(HttpTool.httpClientBuilder()
- // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
- .uri(uri)
- .credentials(new UsernamePasswordCredentials(getUsername(), getPassword()))
- .build(),
- URI.create(uri),
- ImmutableMap.<String, String>of());
+ // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
+ .uri(uri)
+ .credentials(new UsernamePasswordCredentials(getUsername(), getPassword()))
+ .build(),
+ URI.create(uri),
+ ImmutableMap.<String, String>of());
}
-
+
@Override
public void serverAdd(String serverToAdd, String username, String password) {
- // TODO the POST is failing with SocketException: Connection reset
- // removing any data makes the problem go away; i suspect it is the combo of:
- // credentials, an explicit port, and content.
- // but i do not know how to fix it...
-//// curl -u Administrator:password\
-//// 192.168.60.101:8091/controller/addNode \
-//// -d "hostname=192.168.60.103&user=Administrator&password=password"
-// String baseUrl = Preconditions.checkNotNull(getEntity().getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), "web admin URL not available");
-// String uri = Urls.mergePaths(baseUrl, "controller/addNode");
-// URI uriU = URI.create(uri);
-//
-// HttpClient client = HttpTool.httpClientBuilder()
-// // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials
-// .uri(uriU.getScheme()+"://"+uriU.getHost())
-// .credentials(new UsernamePasswordCredentials(getUsername(), getPassword()))
-// .build();
-// client.getParams().setParameter("http.socket.timeout", new Integer(0));
-// client.getParams().setParameter("http.connection.stalecheck", new Boolean(true));
-//
-// HttpToolResponse response = HttpTool.httpPost(client,
-// URI.create(uri),
-//// ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()),
-// // TODO do we need the above?
-// ImmutableMap.<String,String>of(),
-//
-// ("hostname="+Urls.encode(serverToAdd)+
-// "&user"+Urls.encode(username)+
-// "&password"+Urls.encode(password)).getBytes()
-// // the following two work
-//// "".getBytes()
-//// ImmutableMap.<String,String>of()
-// );
-// if (response.getResponseCode()==200) {
-// log.debug("Completed addNode call for "+serverToAdd+" via REST to "+getEntity()+": "+response.getContentAsString());
-// } else {
-// log.warn("Failed addNode call for "+serverToAdd+" via REST to "+getEntity()+": "+response.getResponseCode()+" / "+response.getContentAsString());
-// throw new IllegalStateException("Failed addNode call for "+serverToAdd+" via REST to "+getEntity()+": "+response.getResponseCode()+" / "+response.getContentAsString());
-// }
-
- // TODO would like a WebTasks API such as this:
-// DynamicTasks.queue(WebTasks.get(baseUrl).subpath("controller/addNode").credentials(getUsername(), getPassword())
-// .queryParam("hostname", serverToAdd).queryParam("user", username).queryParam("password", password)
-// .summary("REST addNode "+serverToAdd)).getUnchecked();
-
- // or, via CLI:
newScript("serverAdd").body.append(couchbaseCli("server-add")
+ getCouchbaseHostnameAndCredentials() +
" --server-add=" + BashStringEscapes.wrapBash(serverToAdd) +
@@ -503,60 +453,60 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp
@Override
public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) {
- log.info("Adding bucket: {} to cluster {} primary node: {}", new Object[] { bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), getEntity() });
-
+ log.info("Adding bucket: {} to cluster {} primary node: {}", new Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), getEntity()});
+
newScript("bucketCreate").body.append(couchbaseCli("bucket-create")
- + getCouchbaseHostnameAndCredentials() +
- " --bucket=" + BashStringEscapes.wrapBash(bucketName) +
- " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) +
- " --bucket-port=" + bucketPort +
- " --bucket-ramsize=" + bucketRamSize +
- " --bucket-replica=" + bucketReplica)
- .failOnNonZeroResultCode()
- .execute();
+ + getCouchbaseHostnameAndCredentials() +
+ " --bucket=" + BashStringEscapes.wrapBash(bucketName) +
+ " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) +
+ " --bucket-port=" + bucketPort +
+ " --bucket-ramsize=" + bucketRamSize +
+ " --bucket-replica=" + bucketReplica)
+ .failOnNonZeroResultCode()
+ .execute();
}
@Override
public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket) {
DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, Attributes.SERVICE_UP)).getUnchecked();
-
+
String destName = CouchbaseClusterImpl.getClusterName(toCluster);
-
- log.info("Setting up XDCR for "+fromBucket+" from "+CouchbaseClusterImpl.getClusterName(getEntity())+" (via "+getEntity()+") "
- + "to "+destName+" ("+toCluster+")");
-
+
+ log.info("Setting up XDCR for " + fromBucket + " from " + CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") "
+ + "to " + destName + " (" + toCluster + ")");
+
Entity destPrimaryNode = toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE);
String destHostname = destPrimaryNode.getAttribute(Attributes.HOSTNAME);
String destUsername = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME);
String destPassword = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD);
// on the REST API there is mention of a 'type' 'continuous' but i don't see other refs to this
-
+
// PROTOCOL Select REST protocol or memcached for replication. xmem indicates memcached while capi indicates REST protocol.
// looks like xmem is the default; leave off for now
// String replMode = "xmem";
-
+
DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh(
- couchbaseCli("xdcr-setup") +
- getCouchbaseHostnameAndCredentials() +
- " --create" +
- " --xdcr-cluster-name="+BashStringEscapes.wrapBash(destName) +
- " --xdcr-hostname="+BashStringEscapes.wrapBash(destHostname) +
- " --xdcr-username="+BashStringEscapes.wrapBash(destUsername) +
- " --xdcr-password="+BashStringEscapes.wrapBash(destPassword)
- ).summary("create xdcr destination "+destName).newTask()));
+ couchbaseCli("xdcr-setup") +
+ getCouchbaseHostnameAndCredentials() +
+ " --create" +
+ " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) +
+ " --xdcr-hostname=" + BashStringEscapes.wrapBash(destHostname) +
+ " --xdcr-username=" + BashStringEscapes.wrapBash(destUsername) +
+ " --xdcr-password=" + BashStringEscapes.wrapBash(destPassword)
+ ).summary("create xdcr destination " + destName).newTask()));
// would be nice to auto-create bucket, but we'll need to know the parameters; the port in particular is tedious
// ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", null, 0, 0);
-
+
DynamicTasks.queue(SshEffectorTasks.ssh(
- couchbaseCli("xdcr-replicate") +
- getCouchbaseHostnameAndCredentials() +
- " --create" +
- " --xdcr-cluster-name="+BashStringEscapes.wrapBash(destName) +
- " --xdcr-from-bucket="+BashStringEscapes.wrapBash(fromBucket) +
- " --xdcr-to-bucket="+BashStringEscapes.wrapBash(toBucket)
+ couchbaseCli("xdcr-replicate") +
+ getCouchbaseHostnameAndCredentials() +
+ " --create" +
+ " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) +
+ " --xdcr-from-bucket=" + BashStringEscapes.wrapBash(fromBucket) +
+ " --xdcr-to-bucket=" + BashStringEscapes.wrapBash(toBucket)
// + " --xdcr-replication-mode="+replMode
- ).summary("configure replication for "+fromBucket+" to "+destName+":"+toBucket).newTask());
+ ).summary("configure replication for " + fromBucket + " to " + destName + ":" + toBucket).newTask());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1784c21d/software/nosql/src/main/resources/couchbase-logo.png
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/resources/couchbase-logo.png b/software/nosql/src/main/resources/couchbase-logo.png
new file mode 100644
index 0000000..94daf5d
Binary files /dev/null and b/software/nosql/src/main/resources/couchbase-logo.png differ
[2/2] incubator-brooklyn git commit: This closes #317
Posted by gr...@apache.org.
This closes #317
* github/pr/317:
Couchbase changes for Clocker
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/59208f56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/59208f56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/59208f56
Branch: refs/heads/master
Commit: 59208f56a264d85d68ccc51bc9f2acf457dc1107
Parents: 27df750 1784c21
Author: Andrew Kennedy <gr...@apache.org>
Authored: Wed Nov 12 13:49:02 2014 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Wed Nov 12 13:49:02 2014 +0000
----------------------------------------------------------------------
.../nosql/couchbase/CouchbaseClusterImpl.java | 18 +-
.../nosql/couchbase/CouchbaseNodeImpl.java | 83 ++--
.../nosql/couchbase/CouchbaseNodeSshDriver.java | 378 ++++++++-----------
.../nosql/src/main/resources/couchbase-logo.png | Bin 0 -> 88089 bytes
4 files changed, 216 insertions(+), 263 deletions(-)
----------------------------------------------------------------------