You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/08/06 18:32:18 UTC
[04/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the
NoSQL packages
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java
deleted file mode 100644
index 4c7e08c..0000000
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.solr;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import org.apache.solr.common.SolrDocument;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.trait.Startable;
-import brooklyn.test.EntityTestUtils;
-import brooklyn.util.collections.MutableMap;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-
-/**
- * Solr integration tests.
- *
- * Test the operation of the {@link SolrServer} class.
- */
-public class SolrServerIntegrationTest extends AbstractSolrServerTest {
-
- /**
- * Test that a node starts and sets SERVICE_UP correctly.
- */
- @Test(groups = "Integration")
- public void canStartupAndShutdown() {
- solr = app.createAndManageChild(EntitySpec.create(SolrServer.class));
- app.start(ImmutableList.of(testLocation));
-
- EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true);
- Entities.dumpInfo(app);
-
- solr.stop();
-
- EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, false);
- }
-
- /**
- * Test that a core can be created and used with SolrJ client.
- */
- @Test(groups = "Integration")
- public void testConnection() throws Exception {
- solr = app.createAndManageChild(EntitySpec.create(SolrServer.class)
- .configure(SolrServer.SOLR_CORE_CONFIG, ImmutableMap.of("example", "classpath://solr/example.tgz")));
- app.start(ImmutableList.of(testLocation));
-
- EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true);
-
- SolrJSupport client = new SolrJSupport(solr, "example");
-
- Iterable<SolrDocument> results = client.getDocuments();
- assertTrue(Iterables.isEmpty(results));
-
- client.addDocument(MutableMap.<String, Object>of("id", "1", "description", "first"));
- client.addDocument(MutableMap.<String, Object>of("id", "2", "description", "second"));
- client.addDocument(MutableMap.<String, Object>of("id", "3", "description", "third"));
- client.commit();
-
- results = client.getDocuments();
- assertEquals(Iterables.size(results), 3);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java
deleted file mode 100644
index 82fb107..0000000
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.solr;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.util.Map;
-
-import org.apache.solr.common.SolrDocument;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.trait.Startable;
-import brooklyn.test.EntityTestUtils;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.text.Strings;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-
-/**
- * Solr live tests.
- *
- * Test the operation of the {@link SolrServer} class using the jclouds {@code rackspace-cloudservers-uk}
- * and {@code aws-ec2} providers, with different OS images. The tests use the {@link SolrJSupport} class
- * to exercise the node, and will need to have {@code brooklyn.jclouds.provider.identity} and {@code .credential}
- * set, usually in the {@code .brooklyn/brooklyn.properties} file.
- */
-public class SolrServerLiveTest extends AbstractSolrServerTest {
-
- private static final Logger log = LoggerFactory.getLogger(SolrServerLiveTest.class);
-
- @DataProvider(name = "virtualMachineData")
- public Object[][] provideVirtualMachineData() {
- return new Object[][] { // ImageId, Provider, Region, Description (for logging)
- new Object[] { "eu-west-1/ami-0307d674", "aws-ec2", "eu-west-1", "Ubuntu Server 14.04 LTS (HVM), SSD Volume Type" },
- new Object[] { "LON/f9b690bf-88eb-43c2-99cf-391f2558732e", "rackspace-cloudservers-uk", "", "Ubuntu 12.04 LTS (Precise Pangolin)" },
- new Object[] { "LON/a84b1592-6817-42da-a57c-3c13f3cfc1da", "rackspace-cloudservers-uk", "", "CentOS 6.5 (PVHVM)" },
- };
- }
-
- @Test(groups = "Live", dataProvider = "virtualMachineData")
- protected void testOperatingSystemProvider(String imageId, String provider, String region, String description) throws Exception {
- log.info("Testing Solr on {}{} using {} ({})", new Object[] { provider, Strings.isNonEmpty(region) ? ":" + region : "", description, imageId });
-
- Map<String, String> properties = MutableMap.of("imageId", imageId);
- testLocation = app.getManagementContext().getLocationRegistry()
- .resolve(provider + (Strings.isNonEmpty(region) ? ":" + region : ""), properties);
- solr = app.createAndManageChild(EntitySpec.create(SolrServer.class)
- .configure(SolrServer.SOLR_CORE_CONFIG, ImmutableMap.of("example", "classpath://solr/example.tgz")));
- app.start(ImmutableList.of(testLocation));
-
- EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true);
-
- SolrJSupport client = new SolrJSupport(solr, "example");
-
- Iterable<SolrDocument> results = client.getDocuments();
- assertTrue(Iterables.isEmpty(results));
-
- client.addDocument(MutableMap.<String, Object>of("id", "1", "description", "first"));
- client.addDocument(MutableMap.<String, Object>of("id", "2", "description", "second"));
- client.addDocument(MutableMap.<String, Object>of("id", "3", "description", "third"));
- client.commit();
-
- results = client.getDocuments();
- assertEquals(Iterables.size(results), 3);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java
new file mode 100644
index 0000000..ab158bd
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.testng.annotations.BeforeMethod;
+
+import brooklyn.entity.BrooklynAppLiveTestSupport;
+import brooklyn.location.Location;
+
+/**
+ * Cassandra test framework for integration and live tests.
+ */
+public class AbstractCassandraNodeTest extends BrooklynAppLiveTestSupport {
+
+ protected Location testLocation;
+ protected CassandraNode cassandra;
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ testLocation = app.newLocalhostProvisioningLocation();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java
new file mode 100644
index 0000000..b7587d7
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.netflix.astyanax.AstyanaxContext;
+import com.netflix.astyanax.Cluster;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.exceptions.SchemaDisagreementException;
+import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
+import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
+import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+
+/**
+ * Cassandra testing using Astyanax API.
+ */
+public class AstyanaxSupport {
+ private static final Logger log = LoggerFactory.getLogger(AstyanaxSupport.class);
+
+ public final String clusterName;
+ public final String hostname;
+ public final int thriftPort;
+
+ public AstyanaxSupport(CassandraNode node) {
+ this(node.getClusterName(), node.getAttribute(Attributes.HOSTNAME), node.getThriftPort());
+ }
+
+ public AstyanaxSupport(String clusterName, String hostname, int thriftPort) {
+ this.clusterName = clusterName;
+ this.hostname = hostname;
+ this.thriftPort = thriftPort;
+ }
+
+ public AstyanaxContext<Keyspace> newAstyanaxContextForKeyspace(String keyspace) {
+ AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder()
+ .forCluster(clusterName)
+ .forKeyspace(keyspace)
+ .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
+ .setDiscoveryType(NodeDiscoveryType.NONE))
+ .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool")
+ .setPort(thriftPort)
+ .setMaxConnsPerHost(1)
+ .setConnectTimeout(5000) // 10s
+ .setSeeds(String.format("%s:%d", hostname, thriftPort)))
+ .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
+ .buildKeyspace(ThriftFamilyFactory.getInstance());
+
+ context.start();
+ return context;
+ }
+
+ public AstyanaxContext<Cluster> newAstyanaxContextForCluster() {
+ AstyanaxContext<Cluster> context = new AstyanaxContext.Builder()
+ .forCluster(clusterName)
+ .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
+ .setDiscoveryType(NodeDiscoveryType.NONE))
+ .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool")
+ .setPort(thriftPort)
+ .setMaxConnsPerHost(1)
+ .setConnectTimeout(5000) // 10s
+ .setSeeds(String.format("%s:%d", hostname, thriftPort)))
+ .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
+ .buildCluster(ThriftFamilyFactory.getInstance());
+
+ context.start();
+ return context;
+ }
+
+ public static class AstyanaxSample extends AstyanaxSupport {
+
+ public static class Builder {
+ protected CassandraNode node;
+ protected String clusterName;
+ protected String hostname;
+ protected Integer thriftPort;
+ protected String columnFamilyName = Identifiers.makeRandomId(8);
+
+ public Builder node(CassandraNode val) {
+ this.node = val;
+ clusterName = node.getClusterName();
+ hostname = node.getAttribute(Attributes.HOSTNAME);
+ thriftPort = node.getThriftPort();
+ return this;
+ }
+ public Builder host(String clusterName, String hostname, int thriftPort) {
+ this.clusterName = clusterName;
+ this.hostname = hostname;
+ this.thriftPort = thriftPort;
+ return this;
+ }
+ public Builder columnFamilyName(String val) {
+ this.columnFamilyName = val;
+ return this;
+ }
+ public AstyanaxSample build() {
+ return new AstyanaxSample(this);
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public final String columnFamilyName;
+ public final ColumnFamily<String, String> sampleColumnFamily;
+
+ public AstyanaxSample(CassandraNode node) {
+ this(builder().node(node));
+ }
+
+ public AstyanaxSample(String clusterName, String hostname, int thriftPort) {
+ this(builder().host(clusterName, hostname, thriftPort));
+ }
+
+ protected AstyanaxSample(Builder builder) {
+ super(builder.clusterName, builder.hostname, builder.thriftPort);
+ columnFamilyName = checkNotNull(builder.columnFamilyName, "columnFamilyName");
+ sampleColumnFamily = new ColumnFamily<String, String>(
+ columnFamilyName, // Column Family Name
+ StringSerializer.get(), // Key Serializer
+ StringSerializer.get()); // Column Serializer
+ }
+
+ /**
+ * Exercise the {@link CassandraNode} using the Astyanax API.
+ */
+ public void astyanaxTest() throws Exception {
+ String keyspaceName = "BrooklynTests_"+Identifiers.makeRandomId(8);
+ writeData(keyspaceName);
+ readData(keyspaceName);
+ }
+
+ /**
+ * Write to a {@link CassandraNode} using the Astyanax API.
+ * @throws ConnectionException
+ */
+ public void writeData(String keyspaceName) throws ConnectionException {
+ // Create context
+ AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName);
+ try {
+ Keyspace keyspace = context.getEntity();
+ try {
+ checkNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName);
+ } catch (Exception ek) {
+ // (Re) Create keyspace if needed (including if family name already existed,
+ // e.g. due to a timeout on previous attempt)
+ log.debug("repairing Cassandra error by re-creating keyspace "+keyspace+": "+ek);
+ try {
+ log.debug("dropping Cassandra keyspace "+keyspace);
+ keyspace.dropKeyspace();
+ } catch (Exception e) {
+ /* Ignore */
+ log.debug("Cassandra keyspace "+keyspace+" could not be dropped (probably did not exist): "+e);
+ }
+ try {
+ keyspace.createKeyspace(ImmutableMap.<String, Object>builder()
+ .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1"))
+ .put("strategy_class", "SimpleStrategy")
+ .build());
+ } catch (SchemaDisagreementException e) {
+ // discussion (but not terribly helpful) at http://stackoverflow.com/questions/6770894/schemadisagreementexception
+ // let's just try again after a delay
+ // (seems to have no effect; trying to fix by starting first node before others)
+ log.warn("error creating Cassandra keyspace "+keyspace+" (retrying): "+e);
+ Time.sleep(Duration.FIVE_SECONDS);
+ keyspace.createKeyspace(ImmutableMap.<String, Object>builder()
+ .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1"))
+ .put("strategy_class", "SimpleStrategy")
+ .build());
+ }
+ }
+
+ assertNull(keyspace.describeKeyspace().getColumnFamily("Rabbits"), "key space for arbitrary column family Rabbits");
+ assertNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName);
+
+ // Create column family
+ keyspace.createColumnFamily(sampleColumnFamily, null);
+
+ // Insert rows
+ MutationBatch m = keyspace.prepareMutationBatch();
+ m.withRow(sampleColumnFamily, "one")
+ .putColumn("name", "Alice", null)
+ .putColumn("company", "Cloudsoft Corp", null);
+ m.withRow(sampleColumnFamily, "two")
+ .putColumn("name", "Bob", null)
+ .putColumn("company", "Cloudsoft Corp", null)
+ .putColumn("pet", "Cat", null);
+
+ OperationResult<Void> insert = m.execute();
+ assertEquals(insert.getHost().getHostName(), hostname);
+ assertTrue(insert.getLatency() > 0L);
+ } finally {
+ context.shutdown();
+ }
+ }
+
+ /**
+ * Read from a {@link CassandraNode} using the Astyanax API.
+ * @throws ConnectionException
+ */
+ public void readData(String keyspaceName) throws ConnectionException {
+ // Create context
+ AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName);
+ try {
+ Keyspace keyspace = context.getEntity();
+
+ // Query data
+ OperationResult<ColumnList<String>> query = keyspace.prepareQuery(sampleColumnFamily)
+ .getKey("one")
+ .execute();
+ assertEquals(query.getHost().getHostName(), hostname);
+ assertTrue(query.getLatency() > 0L);
+
+ ColumnList<String> columns = query.getResult();
+ assertEquals(columns.size(), 2);
+
+ // Lookup columns in response by name
+ String name = columns.getColumnByName("name").getStringValue();
+ assertEquals(name, "Alice");
+
+ // Iterate through the columns
+ for (Column<String> c : columns) {
+ assertTrue(ImmutableList.of("name", "company").contains(c.getName()));
+ }
+ } finally {
+ context.shutdown();
+ }
+ }
+
+
+ /**
+ * Returns the keyspace name to which the data has been written. If it fails the first time,
+ * then will increment the keyspace name. This is because the failure could be a response timeout,
+ * where the keyspace really has been created so subsequent attempts with the same name will
+ * fail (because we assert that the keyspace did not exist).
+ */
+ public String writeData(String keyspacePrefix, int numRetries) throws ConnectionException {
+ int retryCount = 0;
+ while (true) {
+ try {
+ String keyspaceName = keyspacePrefix + (retryCount > 0 ? "" : "_"+retryCount);
+ writeData(keyspaceName);
+ return keyspaceName;
+ } catch (Exception e) {
+ log.warn("Error writing data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e);
+ if (++retryCount > numRetries)
+ throw Exceptions.propagate(e);
+ }
+ }
+ }
+
+ /**
+ * Repeatedly tries to read data from the given keyspace name. Asserts that the data is the
+ * same as would be written by calling {@code writeData(keyspaceName)}.
+ */
+ public void readData(String keyspaceName, int numRetries) throws ConnectionException {
+ int retryCount = 0;
+ while (true) {
+ try {
+ readData(keyspaceName);
+ return;
+ } catch (Exception e) {
+ log.warn("Error reading data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e);
+ if (++retryCount > numRetries)
+ throw Exceptions.propagate(e);
+ }
+ }
+ }
+
+ /**
+ * Like {@link Assert#assertNull(Object, String)}, except throws IllegalStateException instead
+ */
+ private void checkNull(Object obj, String msg) {
+ if (obj != null) {
+ throw new IllegalStateException("Not null: "+msg+"; obj="+obj);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ AstyanaxSample support = new AstyanaxSample("ignored", "ec2-79-125-32-2.eu-west-1.compute.amazonaws.com", 9160);
+ AstyanaxContext<Cluster> context = support.newAstyanaxContextForCluster();
+ try {
+ System.out.println(context.getEntity().describeSchemaVersions());
+ } finally {
+ context.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java
new file mode 100644
index 0000000..ddd6243
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigInteger;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppLiveTestSupport;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.test.Asserts;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+/**
+ * An integration test of the {@link CassandraDatacenter} entity.
+ *
+ * Tests that a one node cluster can be started on localhost and data can be written/read, using the Astyanax API.
+ *
+ * NOTE: If these tests fail with "Timeout waiting for SERVICE_UP" and "java.lang.IllegalStateException: Unable to contact any seeds!"
+ * or "java.lang.RuntimeException: Unable to gossip with any seeds" appears in the log, it may be that the broadcast_address
+ * (set to InetAddress.getLocalHost().getHostName()) is not resolving to the value specified in listen_address
+ * (InetAddress.getLocalHost().getHostAddress()). You can work round this issue by ensuring that you machine has only one
+ * address, e.g. by disabling wireless if you are also using a wired connection
+ */
+public class CassandraDatacenterIntegrationTest extends BrooklynAppLiveTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterIntegrationTest.class);
+
+ protected Location testLocation;
+ protected CassandraDatacenter cluster;
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setUp() throws Exception {
+ CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+ super.setUp();
+ testLocation = app.newLocalhostProvisioningLocation();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+ }
+
+
+ @Test(groups = "Integration")
+ public void testStartAndShutdownClusterSizeOne() throws Exception {
+ EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+ .configure("initialSize", 1)
+ .configure("tokenShift", 42);
+ runStartAndShutdownClusterSizeOne(spec, true);
+ }
+
+ /**
+ * Cassandra v2 needs Java >= 1.7. If you have java 6 as the defult locally, then you can use
+ * something like {@code .configure("shell.env", MutableMap.of("JAVA_HOME", "/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home"))}
+ */
+ @Test(groups = "Integration")
+ public void testStartAndShutdownClusterSizeOneCassandraVersion2() throws Exception {
+ String version = "2.0.9";
+
+ EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraNode.SUGGESTED_VERSION, version)
+ .configure("initialSize", 1);
+ runStartAndShutdownClusterSizeOne(spec, false);
+ }
+
+ /**
+ * Test that a single node cluster starts up and allows access via the Astyanax API.
+ * Only one node because Cassandra can only run one node per VM!
+ */
+ protected void runStartAndShutdownClusterSizeOne(EntitySpec<CassandraDatacenter> datacenterSpec, final boolean assertToken) throws Exception {
+ cluster = app.createAndManageChild(datacenterSpec);
+ assertEquals(cluster.getCurrentSize().intValue(), 0);
+
+ app.start(ImmutableList.of(testLocation));
+ Entities.dumpInfo(app);
+
+ final CassandraNode node = (CassandraNode) Iterables.get(cluster.getMembers(), 0);
+ String nodeAddr = checkNotNull(node.getAttribute(CassandraNode.HOSTNAME), "hostname") + ":" + checkNotNull(node.getAttribute(CassandraNode.THRIFT_PORT), "thriftPort");
+
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 1);
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CASSANDRA_CLUSTER_NODES, ImmutableList.of(nodeAddr));
+
+ EntityTestUtils.assertAttributeEqualsEventually(node, Startable.SERVICE_UP, true);
+ if (assertToken) {
+ PosNeg63TokenGenerator tg = new PosNeg63TokenGenerator();
+ tg.growingCluster(1);
+ EntityTestUtils.assertAttributeEqualsEventually(node, CassandraNode.TOKEN, tg.newToken().add(BigInteger.valueOf(42)));
+ }
+
+ // may take some time to be consistent (with new thrift_latency checks on the node,
+ // contactability should not be an issue, but consistency still might be)
+ Asserts.succeedsEventually(MutableMap.of("timeout", 120*1000), new Runnable() {
+ public void run() {
+ boolean open = CassandraDatacenterLiveTest.isSocketOpen(node);
+ Boolean consistant = open ? CassandraDatacenterLiveTest.areVersionsConsistent(node) : null;
+ Integer numPeers = node.getAttribute(CassandraNode.PEERS);
+ Integer liveNodeCount = node.getAttribute(CassandraNode.LIVE_NODE_COUNT);
+ String msg = "consistency: "
+ + (!open ? "unreachable" : consistant==null ? "error" : consistant)+"; "
+ + "peer group sizes: "+numPeers + "; live node count: " + liveNodeCount;
+ assertTrue(open, msg);
+ assertEquals(consistant, Boolean.TRUE, msg);
+ if (assertToken) {
+ assertEquals(numPeers, (Integer)1, msg);
+ } else {
+ assertTrue(numPeers != null && numPeers >= 1, msg);
+ }
+ assertEquals(liveNodeCount, (Integer)1, msg);
+ }});
+
+ CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(node));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java
new file mode 100644
index 0000000..d29bc1a
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigInteger;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppLiveTestSupport;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.test.Asserts;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.netflix.astyanax.AstyanaxContext;
+import com.netflix.astyanax.Cluster;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+/**
+ * A live test of the {@link CassandraDatacenter} entity.
+ *
+ * Tests that a two node cluster can be started on Amazon EC2 and data written on one {@link CassandraNode}
+ * can be read from another, using the Astyanax API.
+ */
+public class CassandraDatacenterLiveTest extends BrooklynAppLiveTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterLiveTest.class);
+
+ private String provider =
+ "aws-ec2:eu-west-1";
+// "rackspace-cloudservers-uk";
+// "named:hpcloud-compute-at";
+// "localhost";
+// "jcloudsByon:(provider=\"aws-ec2\",region=\"us-east-1\",user=\"aled\",hosts=\"i-6f374743,i-35324219,i-1135453d\")";
+
+ protected Location testLocation;
+ protected CassandraDatacenter cluster;
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ testLocation = mgmt.getLocationRegistry().resolve(provider);
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @Test(groups = "Live")
+ public void testDatacenter() throws Exception {
+ EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+ .configure("initialSize", 2)
+ .configure("clusterName", "CassandraClusterLiveTest");
+ runCluster(spec, false);
+ }
+
+ @Test(groups = "Live")
+ public void testDatacenterWithVnodes() throws Exception {
+ EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+ .configure("initialSize", 2)
+ .configure(CassandraDatacenter.USE_VNODES, true)
+ .configure("clusterName", "CassandraClusterLiveTest");
+ runCluster(spec, true);
+ }
+
+ /*
+ * TODO on some distros (e.g. CentOS?), it comes pre-installed with java 6. Installing java 7
+ * didn't seem to be enough. I also had to set JAVA_HOME:
+ * .configure("shell.env", MutableMap.of("JAVA_HOME", "/etc/alternatives/java_sdk_1.7.0"))
+ * However, that would break other deployments such as on Ubuntu where JAVA_HOME would be different.
+ */
+ @Test(groups = "Live")
+ public void testDatacenterWithVnodesVersion2() throws Exception {
+ EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
+ .configure("initialSize", 2)
+ .configure(CassandraNode.SUGGESTED_VERSION, "2.0.9")
+ .configure(CassandraDatacenter.USE_VNODES, true)
+ .configure("clusterName", "CassandraClusterLiveTest");
+ runCluster(spec, true);
+ }
+
+ @Test(groups = {"Live", "Acceptance"}, invocationCount=10)
+ public void testManyTimes() throws Exception {
+ testDatacenter();
+ }
+
+ /**
+ * Test a Cassandra Datacenter:
+ * <ol>
+ * <li>Create two node datacenter
+ * <li>Confirm allows access via the Astyanax API through both nodes.
+ * <li>Confirm can size
+ * </ol>
+ */
+ protected void runCluster(EntitySpec<CassandraDatacenter> datacenterSpec, boolean usesVnodes) throws Exception {
+ cluster = app.createAndManageChild(datacenterSpec);
+ assertEquals(cluster.getCurrentSize().intValue(), 0);
+
+ app.start(ImmutableList.of(testLocation));
+
+ // Check cluster is up and healthy
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 2);
+ Entities.dumpInfo(app);
+ List<CassandraNode> members = castToCassandraNodes(cluster.getMembers());
+ assertNodesConsistent(members);
+
+ if (usesVnodes) {
+ assertVnodeTokensConsistent(members);
+ } else {
+ assertSingleTokenConsistent(members);
+ }
+
+ // Can connect via Astyanax
+ checkConnectionRepeatedly(2, 5, members);
+
+ // Resize
+ cluster.resize(3);
+ assertEquals(cluster.getMembers().size(), 3, "members="+cluster.getMembers());
+ if (usesVnodes) {
+ assertVnodeTokensConsistent(castToCassandraNodes(cluster.getMembers()));
+ } else {
+ assertSingleTokenConsistent(castToCassandraNodes(cluster.getMembers()));
+ }
+ checkConnectionRepeatedly(2, 5, cluster.getMembers());
+ }
+
+ protected static List<CassandraNode> castToCassandraNodes(Collection<? extends Entity> rawnodes) {
+ final List<CassandraNode> nodes = Lists.newArrayList();
+ for (Entity node : rawnodes) {
+ nodes.add((CassandraNode) node);
+ }
+ return nodes;
+ }
+
+ protected static void assertNodesConsistent(final List<CassandraNode> nodes) {
+ final Integer expectedLiveNodeCount = nodes.size();
+ // may take some time to be consistent (with new thrift_latency checks on the node,
+ // contactability should not be an issue, but consistency still might be)
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
+ public void run() {
+ for (Entity n : nodes) {
+ CassandraNode node = (CassandraNode) n;
+ EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true);
+ String errmsg = "node="+node+"; hostname="+node.getAttribute(Attributes.HOSTNAME)+"; port="+node.getThriftPort();
+ assertTrue(isSocketOpen(node), errmsg);
+ assertTrue(areVersionsConsistent(node), errmsg);
+ EntityTestUtils.assertAttributeEquals(node, CassandraNode.LIVE_NODE_COUNT, expectedLiveNodeCount);
+ }
+ }});
+ }
+
+ protected static void assertSingleTokenConsistent(final List<CassandraNode> nodes) {
+ final int numNodes = nodes.size();
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
+ public void run() {
+ Set<BigInteger> alltokens = Sets.newLinkedHashSet();
+ for (Entity node : nodes) {
+ EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true);
+ EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 1);
+ EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, numNodes);
+ BigInteger token = node.getAttribute(CassandraNode.TOKEN);
+ Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS);
+ assertNotNull(token);
+ assertEquals(tokens, ImmutableSet.of(token));
+ alltokens.addAll(tokens);
+ }
+ assertEquals(alltokens.size(), numNodes);
+ }});
+ }
+
+ protected static void assertVnodeTokensConsistent(final List<CassandraNode> nodes) {
+ final int numNodes = nodes.size();
+ final int tokensPerNode = Iterables.get(nodes, 0).getNumTokensPerNode();
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
+ public void run() {
+ Set<BigInteger> alltokens = Sets.newLinkedHashSet();
+ for (Entity node : nodes) {
+ EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true);
+ EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, tokensPerNode*numNodes);
+ EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 256);
+ BigInteger token = node.getAttribute(CassandraNode.TOKEN);
+ Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS);
+ assertNotNull(token);
+ assertEquals(tokens.size(), tokensPerNode, "tokens="+tokens);
+ alltokens.addAll(tokens);
+ }
+ assertEquals(alltokens.size(), tokensPerNode*numNodes);
+ }});
+ }
+
+ protected static void checkConnectionRepeatedly(int totalAttemptsAllowed, int numRetriesPerAttempt, Iterable<? extends Entity> nodes) throws Exception {
+ int attemptNum = 0;
+ while (true) {
+ try {
+ checkConnection(numRetriesPerAttempt, nodes);
+ return;
+ } catch (Exception e) {
+ attemptNum++;
+ if (attemptNum >= totalAttemptsAllowed) {
+ log.warn("Cassandra not usable, "+attemptNum+" attempts; failing: "+e, e);
+ throw e;
+ }
+ log.warn("Cassandra not usable (attempt "+attemptNum+" of "+totalAttemptsAllowed+"), trying again after delay: "+e, e);
+ Time.sleep(Duration.TEN_SECONDS);
+ }
+ }
+ }
+
+ protected static void checkConnection(int numRetries, Iterable<? extends Entity> nodes) throws ConnectionException {
+ CassandraNode first = (CassandraNode) Iterables.get(nodes, 0);
+
+ // have been seeing intermittent SchemaDisagreementException errors on AWS, probably due to Astyanax / how we are using it
+ // (confirmed that clocks are in sync)
+ String uniqueName = Identifiers.makeRandomId(8);
+ AstyanaxSample astyanaxFirst = AstyanaxSample.builder().node(first).columnFamilyName(uniqueName).build();
+ Map<String, List<String>> versions;
+ AstyanaxContext<Cluster> context = astyanaxFirst.newAstyanaxContextForCluster();
+ try {
+ versions = context.getEntity().describeSchemaVersions();
+ } finally {
+ context.shutdown();
+ }
+
+ log.info("Cassandra schema versions are: "+versions);
+ if (versions.size() > 1) {
+ Assert.fail("Inconsistent versions on Cassandra start: "+versions);
+ }
+ String keyspacePrefix = "BrooklynTests_"+Identifiers.makeRandomId(8);
+
+ String keyspaceName = astyanaxFirst.writeData(keyspacePrefix, numRetries);
+
+ for (Entity node : nodes) {
+ AstyanaxSample astyanaxSecond = AstyanaxSample.builder().node((CassandraNode)node).columnFamilyName(uniqueName).build();
+ astyanaxSecond.readData(keyspaceName, numRetries);
+ }
+ }
+
+ protected static Boolean areVersionsConsistent(CassandraNode node) {
+ AstyanaxContext<Cluster> context = null;
+ try {
+ context = new AstyanaxSample(node).newAstyanaxContextForCluster();
+ Map<String, List<String>> v = context.getEntity().describeSchemaVersions();
+ return v.size() == 1;
+ } catch (Exception e) {
+ return null;
+ } finally {
+ if (context != null) context.shutdown();
+ }
+ }
+
+ protected static boolean isSocketOpen(CassandraNode node) {
+ try {
+ Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getThriftPort());
+ s.close();
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java
new file mode 100644
index 0000000..4c2a248
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.math.BigInteger;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.proxy.nginx.NginxController;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.rebind.RebindOptions;
+import brooklyn.entity.rebind.RebindTestFixtureWithApp;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+/**
+ * Test the operation of the {@link NginxController} class.
+ */
+public class CassandraDatacenterRebindIntegrationTest extends RebindTestFixtureWithApp {
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraDatacenterRebindIntegrationTest.class);
+
+ private LocalhostMachineProvisioningLocation localhostProvisioningLocation;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+ super.setUp();
+ localhostProvisioningLocation = origApp.newLocalhostProvisioningLocation();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually();
+ }
+
+ /**
+ * Test that Brooklyn can rebind to a single node datacenter.
+ */
+ @Test(groups = "Integration")
+ public void testRebindDatacenterOfSizeOne() throws Exception {
+ CassandraDatacenter origDatacenter = origApp.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+ .configure("initialSize", 1));
+
+ origApp.start(ImmutableList.of(localhostProvisioningLocation));
+ CassandraNode origNode = (CassandraNode) Iterables.get(origDatacenter.getMembers(), 0);
+
+ EntityTestUtils.assertAttributeEqualsEventually(origDatacenter, CassandraDatacenter.GROUP_SIZE, 1);
+ CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(origNode));
+ CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(origNode));
+ CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(origNode));
+ BigInteger origToken = origNode.getAttribute(CassandraNode.TOKEN);
+ Set<BigInteger> origTokens = origNode.getAttribute(CassandraNode.TOKENS);
+ assertNotNull(origToken);
+
+ newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true));
+ final CassandraDatacenter newDatacenter = (CassandraDatacenter) Iterables.find(newApp.getChildren(), Predicates.instanceOf(CassandraDatacenter.class));
+ final CassandraNode newNode = (CassandraNode) Iterables.find(newDatacenter.getMembers(), Predicates.instanceOf(CassandraNode.class));
+
+ EntityTestUtils.assertAttributeEqualsEventually(newDatacenter, CassandraDatacenter.GROUP_SIZE, 1);
+ EntityTestUtils.assertAttributeEqualsEventually(newNode, Startable.SERVICE_UP, true);
+ EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKEN, origToken);
+ EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKENS, origTokens);
+ CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(newNode));
+ CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(newNode));
+ CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(newNode));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java
new file mode 100644
index 0000000..3a1d202
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.EmptySoftwareProcess;
+import brooklyn.entity.basic.EmptySoftwareProcessSshDriver;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.location.LocationSpec;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.text.TemplateProcessor;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class CassandraDatacenterTest extends BrooklynAppUnitTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterTest.class);
+
+ private LocalhostMachineProvisioningLocation loc;
+ private CassandraDatacenter cluster;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ loc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+ }
+
+ @Test
+ public void testPopulatesInitialSeeds() throws Exception {
+ cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+ .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO)
+ .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+ .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+ app.start(ImmutableList.of(loc));
+ EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0);
+ EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1);
+
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2));
+ }
+
+ @Test(groups="Integration") // because takes approx 2 seconds
+ public void testUpdatesSeedsOnFailuresAndAdditions() throws Exception {
+ doTestUpdatesSeedsOnFailuresAndAdditions(true, false);
+ }
+
+ protected void doTestUpdatesSeedsOnFailuresAndAdditions(boolean fast, boolean checkSeedsConstantOnRejoining) throws Exception {
+ cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+ .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO)
+ .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+ .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+ app.start(ImmutableList.of(loc));
+ EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0);
+ EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1);
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2));
+ log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e1="+e1+" e2="+e2);
+
+ // calling the driver stop for this entity will cause SERVICE_UP to become false, and stay false
+ // (and that's all it does, incidentally); if we just set the attribute it will become true on serviceUp sensor feed
+ ((EmptySoftwareProcess)e1).getDriver().stop();
+ // not necessary, but speeds things up:
+ if (fast)
+ ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, false);
+
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2));
+
+ cluster.resize(3);
+ EmptySoftwareProcess e3 = (EmptySoftwareProcess) Iterables.getOnlyElement(Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), ImmutableSet.of(e1,e2)));
+ log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e3="+e3);
+ try {
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3));
+ } finally {
+ log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; seeds "+cluster.getAttribute(CassandraDatacenter.CURRENT_SEEDS));
+ }
+
+ if (!checkSeedsConstantOnRejoining) {
+ // cluster should not revert to e1+e2, simply because e1 has come back; but e1 should rejoin the group
+ // (not that important, and waits for 1s, so only done as part of integration)
+ ((EmptySoftwareProcessSshDriver)(((EmptySoftwareProcess)e1).getDriver())).launch();
+ if (fast)
+ ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, true);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, CassandraNode.SERVICE_UP, true);
+ EntityTestUtils.assertAttributeEqualsContinually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3));
+ }
+ }
+
+ @Test
+ public void testPopulatesInitialTokens() throws Exception {
+ cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+ .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO)
+ .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+ .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+ app.start(ImmutableList.of(loc));
+
+ Set<BigInteger> tokens = Sets.newLinkedHashSet();
+ Set<BigInteger> tokens2 = Sets.newLinkedHashSet();
+ for (Entity member : cluster.getMembers()) {
+ BigInteger memberToken = member.getConfig(CassandraNode.TOKEN);
+ Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS);
+ if (memberToken != null) tokens.add(memberToken);
+ if (memberTokens != null) tokens2.addAll(memberTokens);
+ }
+ assertEquals(tokens, ImmutableSet.of(new BigInteger("-9223372036854775808"), BigInteger.ZERO));
+ assertEquals(tokens2, ImmutableSet.of());
+ }
+
+ @Test
+ public void testDoesNotPopulateInitialTokens() throws Exception {
+ cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+ .configure(CassandraDatacenter.USE_VNODES, true)
+ .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+ .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)));
+
+ app.start(ImmutableList.of(loc));
+
+ Set<BigInteger> tokens = Sets.newLinkedHashSet();
+ Set<BigInteger> tokens2 = Sets.newLinkedHashSet();
+ for (Entity member : cluster.getMembers()) {
+ BigInteger memberToken = member.getConfig(CassandraNode.TOKEN);
+ Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS);
+ if (memberToken != null) tokens.add(memberToken);
+ if (memberTokens != null) tokens2.addAll(memberTokens);
+ }
+ assertEquals(tokens, ImmutableSet.of());
+ assertEquals(tokens2, ImmutableSet.of());
+ }
+
+ public static class MockInputForTemplate {
+ public BigInteger getToken() { return new BigInteger("-9223372036854775808"); }
+ public String getTokensAsString() { return "" + getToken(); }
+ public int getNumTokensPerNode() { return 1; }
+ public String getSeeds() { return ""; }
+ public int getGossipPort() { return 1234; }
+ public int getSslGossipPort() { return 1234; }
+ public int getThriftPort() { return 1234; }
+ public int getNativeTransportPort() { return 1234; }
+ public String getClusterName() { return "Mock"; }
+ public String getEndpointSnitchName() { return ""; }
+ public String getListenAddress() { return "0"; }
+ public String getBroadcastAddress() { return "0"; }
+ public String getRpcAddress() { return "0"; }
+ public String getRunDir() { return "/tmp/mock"; }
+ }
+
+ @Test
+ public void testBigIntegerFormattedCorrectly() {
+ Map<String, Object> substitutions = ImmutableMap.<String, Object>builder()
+ .put("entity", new MockInputForTemplate())
+ .put("driver", new MockInputForTemplate())
+ .build();
+
+ String templatedUrl = CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL.getDefaultValue();
+ String url = TemplateProcessor.processTemplateContents(templatedUrl, ImmutableMap.of("entity", ImmutableMap.of("majorMinorVersion", "1.2")));
+ String templateContents = new ResourceUtils(this).getResourceAsString(url);
+ String processedTemplate = TemplateProcessor.processTemplateContents(templateContents, substitutions);
+ Assert.assertEquals(processedTemplate.indexOf("775,808"), -1);
+ Assert.assertTrue(processedTemplate.indexOf("-9223372036854775808") > 0);
+ }
+
+ @Test(groups="Integration") // because takes approx 30 seconds
+ public void testUpdatesSeedsFastishManyTimes() throws Exception {
+ final int COUNT = 20;
+ for (int i=0; i<COUNT; i++) {
+ log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT);
+ try {
+ doTestUpdatesSeedsOnFailuresAndAdditions(true, true);
+ tearDown();
+ setUp();
+ } catch (Exception e) {
+ log.warn("Error in "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT, e);
+ throw e;
+ }
+ }
+ }
+
+ @Test(groups="Integration") // because takes approx 5 seconds
+ public void testUpdateSeedsSlowAndRejoining() throws Exception {
+ final int COUNT = 1;
+ for (int i=0; i<COUNT; i++) {
+ log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT);
+ doTestUpdatesSeedsOnFailuresAndAdditions(false, true);
+ tearDown();
+ setUp();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java
new file mode 100644
index 0000000..cbf55ed
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter;
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraFabric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.EmptySoftwareProcess;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.location.LocationSpec;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class CassandraFabricTest extends BrooklynAppUnitTestSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraFabricTest.class);
+
+ private LocalhostMachineProvisioningLocation loc1;
+ private LocalhostMachineProvisioningLocation loc2;
+ private CassandraFabric fabric;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ loc1 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+ loc2 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+ }
+
+ @Test
+ public void testPopulatesInitialSeeds() throws Exception {
+ fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class)
+ .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2)
+ .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+ .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+ .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))));
+
+ app.start(ImmutableList.of(loc1, loc2));
+ CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0);
+ CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1);
+
+ final EmptySoftwareProcess d1a = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 0);
+ final EmptySoftwareProcess d1b = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 1);
+
+ final EmptySoftwareProcess d2a = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 0);
+ final EmptySoftwareProcess d2b = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 1);
+
+ Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() {
+ @Override public boolean apply(Set<Entity> input) {
+ return input != null && input.size() >= 2 &&
+ Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 &&
+ Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1;
+ }
+ };
+ EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate);
+ EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate);
+ EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate);
+
+ Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS);
+ assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+ assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+ log.info("Seeds="+seeds);
+ }
+
+ @Test
+ public void testPopulatesInitialSeedsWhenNodesOfOneClusterComeUpBeforeTheOtherCluster() throws Exception {
+ fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class)
+ .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2)
+ .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO)
+ .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraDatacenter.INITIAL_SIZE, 2)
+ .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(DummyCassandraNode.class))));
+
+ Thread t = new Thread() {
+ public void run() {
+ app.start(ImmutableList.of(loc1, loc2));
+ }
+ };
+ t.start();
+ try {
+ EntityTestUtils.assertGroupSizeEqualsEventually(fabric, 2);
+ CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0);
+ CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1);
+
+ EntityTestUtils.assertGroupSizeEqualsEventually(d1, 2);
+ final DummyCassandraNode d1a = (DummyCassandraNode) Iterables.get(d1.getMembers(), 0);
+ final DummyCassandraNode d1b = (DummyCassandraNode) Iterables.get(d1.getMembers(), 1);
+
+ EntityTestUtils.assertGroupSizeEqualsEventually(d2, 2);
+ final DummyCassandraNode d2a = (DummyCassandraNode) Iterables.get(d2.getMembers(), 0);
+ final DummyCassandraNode d2b = (DummyCassandraNode) Iterables.get(d2.getMembers(), 1);
+
+ d1a.setAttribute(Attributes.HOSTNAME, "d1a");
+ d1b.setAttribute(Attributes.HOSTNAME, "d1b");
+
+ Thread.sleep(1000);
+ d2a.setAttribute(Attributes.HOSTNAME, "d2a");
+ d2b.setAttribute(Attributes.HOSTNAME, "d2b");
+
+ Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() {
+ @Override public boolean apply(Set<Entity> input) {
+ return input != null && input.size() >= 2 &&
+ Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 &&
+ Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1;
+ }
+ };
+ EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate);
+ EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate);
+ EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate);
+
+ Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS);
+ assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+ assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds);
+ log.info("Seeds="+seeds);
+ } finally {
+ log.info("Failed seeds; fabric="+fabric.getAttribute(CassandraFabric.CURRENT_SEEDS));
+ t.interrupt();
+ }
+ }
+
+
+ @ImplementedBy(DummyCassandraNodeImpl.class)
+ public interface DummyCassandraNode extends Entity, Startable, EntityLocal, EntityInternal {
+ }
+
+ public static class DummyCassandraNodeImpl extends AbstractEntity implements DummyCassandraNode {
+
+ @Override
+ public void start(Collection<? extends Location> locations) {
+ ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
+ }
+
+ @Override
+ public void stop() {
+ ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING);
+ }
+
+ @Override
+ public void restart() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
new file mode 100644
index 0000000..495843f
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.location.Location;
+import brooklyn.test.EntityTestUtils;
+
+import com.google.common.collect.ImmutableList;
+
+public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraNodeEc2LiveTest.class);
+
+ @Override
+ protected void doTest(Location loc) throws Exception {
+ log.info("Testing Cassandra on {}", loc);
+
+ CassandraNode cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+ .configure("thriftPort", "9876+")
+ .configure("clusterName", "TestCluster"));
+ app.start(ImmutableList.of(loc));
+
+ EntityTestUtils.assertAttributeEqualsEventually(cassandra, CassandraNode.SERVICE_UP, true);
+
+ AstyanaxSample astyanax = new AstyanaxSample(cassandra);
+ astyanax.astyanaxTest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java
new file mode 100644
index 0000000..b5a657f
--- /dev/null
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode;
+import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.test.Asserts;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.test.NetworkingTestUtils;
+import brooklyn.util.math.MathPredicates;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+/**
+ * Cassandra integration tests.
+ *
+ * Test the operation of the {@link CassandraNode} class.
+ */
+public class CassandraNodeIntegrationTest extends AbstractCassandraNodeTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraNodeIntegrationTest.class);
+
+ public static void assertCassandraPortsAvailableEventually() {
+ Map<String, Integer> ports = getCassandraDefaultPorts();
+ NetworkingTestUtils.assertPortsAvailableEventually(ports);
+ LOG.info("Confirmed Cassandra ports are available: "+ports);
+ }
+
+ public static Map<String, Integer> getCassandraDefaultPorts() {
+ List<PortAttributeSensorAndConfigKey> ports = ImmutableList.of(
+ CassandraNode.GOSSIP_PORT,
+ CassandraNode.SSL_GOSSIP_PORT,
+ CassandraNode.THRIFT_PORT,
+ CassandraNode.NATIVE_TRANSPORT_PORT,
+ CassandraNode.RMI_REGISTRY_PORT);
+ Map<String, Integer> result = Maps.newLinkedHashMap();
+ for (PortAttributeSensorAndConfigKey key : ports) {
+ result.put(key.getName(), key.getConfigKey().getDefaultValue().iterator().next());
+ }
+ return result;
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ public void setUp() throws Exception {
+ assertCassandraPortsAvailableEventually();
+ super.setUp();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ assertCassandraPortsAvailableEventually();
+ }
+
+ /**
+ * Test that a node starts and sets SERVICE_UP correctly.
+ */
+ @Test(groups = "Integration")
+ public void canStartupAndShutdown() {
+ cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+ .configure("jmxPort", "11099+")
+ .configure("rmiRegistryPort", "19001+"));
+ app.start(ImmutableList.of(testLocation));
+
+ EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true);
+ Entities.dumpInfo(app);
+
+ cassandra.stop();
+
+ EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, false);
+ }
+
+ /**
+ * Test that a keyspace and column family can be created and used with Astyanax client.
+ */
+ @Test(groups = "Integration")
+ public void testConnection() throws Exception {
+ cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+ .configure("jmxPort", "11099+")
+ .configure("rmiRegistryPort", "19001+")
+ .configure("thriftPort", "9876+"));
+ app.start(ImmutableList.of(testLocation));
+
+ EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true);
+
+ AstyanaxSample astyanax = new AstyanaxSample(cassandra);
+ astyanax.astyanaxTest();
+ }
+
+ /**
+ * Cassandra v2 needs Java >= 1.7. If you have java 6 as the defult locally, then you can use
+ * something like {@code .configure("shell.env", MutableMap.of("JAVA_HOME", "/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home"))}
+ */
+ @Test(groups = "Integration")
+ public void testCassandraVersion2() throws Exception {
+ // TODO In v2.0.10, the bin/cassandra script changed to add an additional check for JMX connectivity.
+ // This causes cassandera script to hang for us (presumably due to the CLASSPATH/JVM_OPTS we're passing
+ // in, regarding JMX agent).
+ // See:
+ // - https://issues.apache.org/jira/browse/CASSANDRA-7254
+ // - https://github.com/apache/cassandra/blame/trunk/bin/cassandra#L211-216
+
+ String version = "2.0.9";
+ String majorMinorVersion = "2.0";
+
+ cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+ .configure(CassandraNode.SUGGESTED_VERSION, version)
+ .configure(CassandraNode.NUM_TOKENS_PER_NODE, 256)
+ .configure("jmxPort", "11099+")
+ .configure("rmiRegistryPort", "19001+"));
+ app.start(ImmutableList.of(testLocation));
+
+ EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true);
+ Entities.dumpInfo(app);
+
+ AstyanaxSample astyanax = new AstyanaxSample(cassandra);
+ astyanax.astyanaxTest();
+
+ assertEquals(cassandra.getMajorMinorVersion(), majorMinorVersion);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertNotNull(cassandra.getAttribute(CassandraNode.TOKEN));
+ assertNotNull(cassandra.getAttribute(CassandraNode.TOKENS));
+ assertEquals(cassandra.getAttribute(CassandraNode.TOKENS).size(), 256, "tokens="+cassandra.getAttribute(CassandraNode.TOKENS));
+
+ assertEquals(cassandra.getAttribute(CassandraNode.PEERS), (Integer)256);
+ assertEquals(cassandra.getAttribute(CassandraNode.LIVE_NODE_COUNT), (Integer)1);
+
+ assertTrue(cassandra.getAttribute(CassandraNode.SERVICE_UP_JMX));
+ assertNotNull(cassandra.getAttribute(CassandraNode.THRIFT_PORT_LATENCY));
+
+ assertNotNull(cassandra.getAttribute(CassandraNode.READ_PENDING));
+ assertNotNull(cassandra.getAttribute(CassandraNode.READ_ACTIVE));
+ EntityTestUtils.assertAttribute(cassandra, CassandraNode.READ_COMPLETED, MathPredicates.greaterThanOrEqual(1));
+ assertNotNull(cassandra.getAttribute(CassandraNode.WRITE_PENDING));
+ assertNotNull(cassandra.getAttribute(CassandraNode.WRITE_ACTIVE));
+ EntityTestUtils.assertAttribute(cassandra, CassandraNode.WRITE_COMPLETED, MathPredicates.greaterThanOrEqual(1));
+
+ assertNotNull(cassandra.getAttribute(CassandraNode.READS_PER_SECOND_LAST));
+ assertNotNull(cassandra.getAttribute(CassandraNode.WRITES_PER_SECOND_LAST));
+
+ assertNotNull(cassandra.getAttribute(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW));
+ assertNotNull(cassandra.getAttribute(CassandraNode.READS_PER_SECOND_IN_WINDOW));
+ assertNotNull(cassandra.getAttribute(CassandraNode.WRITES_PER_SECOND_IN_WINDOW));
+
+ // an example MXBean
+ EntityTestUtils.assertAttribute(cassandra, CassandraNode.MAX_HEAP_MEMORY, MathPredicates.greaterThanOrEqual(1));
+ }});
+
+ cassandra.stop();
+
+ EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, false);
+ }
+}