You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/11/25 13:17:53 UTC
[08/13] incubator-brooklyn git commit: Cassandra: support disabling
direct connection
Cassandra: support disabling direct connection
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/0904ed97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/0904ed97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/0904ed97
Branch: refs/heads/master
Commit: 0904ed972813e659d96b706b31333818f4bfae3e
Parents: d32693a
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 24 13:25:58 2015 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 24 13:53:47 2015 +0000
----------------------------------------------------------------------
.../entity/nosql/cassandra/CassandraNode.java | 3 +
.../nosql/cassandra/CassandraNodeImpl.java | 239 ++++++++++---------
.../cassandra/CassandraNodeEc2LiveTest.java | 32 +++
3 files changed, 159 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
index 0f42be5..fb937ae 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
@@ -141,6 +141,9 @@ public interface CassandraNode extends DatastoreMixins.DatastoreCommon, Software
BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>(
new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens");
+ @SetFromFlag("useThriftMonitoring")
+ ConfigKey<Boolean> USE_THRIFT_MONITORING = ConfigKeys.newConfigKey("thriftMonitoring.enabled", "Thrift-port monitoring enabled", Boolean.TRUE);
+
AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster");
AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster");
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
index e08c99a..43bf4b2 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
@@ -46,6 +46,7 @@ import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.enricher.stock.Enrichers;
import org.apache.brooklyn.entity.java.JavaAppUtils;
+import org.apache.brooklyn.entity.java.UsesJmx;
import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
import org.apache.brooklyn.feed.function.FunctionFeed;
import org.apache.brooklyn.feed.function.FunctionPollConfig;
@@ -397,115 +398,119 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN
jmxHelper = new JmxHelper(this);
boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
- jmxFeed = JmxFeed.builder()
- .entity(this)
- .period(3000, TimeUnit.MILLISECONDS)
- .helper(jmxHelper)
- .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
- .objectName(storageServiceMBean)
- .attributeName("Initialized")
- .onSuccess(Functions.forPredicate(Predicates.notNull()))
- .onException(Functions.constant(false))
- .suppressDuplicates(true))
- .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
- .objectName(storageServiceMBean)
- .attributeName("TokenToEndpointMap")
- .onSuccess(new Function<Object, Set<BigInteger>>() {
- @Override
- public Set<BigInteger> apply(@Nullable Object arg) {
- Map input = (Map)arg;
- if (input == null || input.isEmpty()) return null;
- // FIXME does not work on aws-ec2, uses RFC1918 address
- Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
- Set<String> tokens = Maps.filterValues(input, self).keySet();
- Set<BigInteger> result = Sets.newLinkedHashSet();
- for (String token : tokens) {
- result.add(new BigInteger(token));
+ if (getDriver().isJmxEnabled()) {
+ jmxFeed = JmxFeed.builder()
+ .entity(this)
+ .period(3000, TimeUnit.MILLISECONDS)
+ .helper(jmxHelper)
+ .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
+ .objectName(storageServiceMBean)
+ .attributeName("Initialized")
+ .onSuccess(Functions.forPredicate(Predicates.notNull()))
+ .onException(Functions.constant(false))
+ .suppressDuplicates(true))
+ .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
+ .objectName(storageServiceMBean)
+ .attributeName("TokenToEndpointMap")
+ .onSuccess(new Function<Object, Set<BigInteger>>() {
+ @Override
+ public Set<BigInteger> apply(@Nullable Object arg) {
+ Map input = (Map)arg;
+ if (input == null || input.isEmpty()) return null;
+ // FIXME does not work on aws-ec2, uses RFC1918 address
+ Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
+ Set<String> tokens = Maps.filterValues(input, self).keySet();
+ Set<BigInteger> result = Sets.newLinkedHashSet();
+ for (String token : tokens) {
+ result.add(new BigInteger(token));
+ }
+ return result;
+ }})
+ .onException(Functions.<Set<BigInteger>>constant(null))
+ .suppressDuplicates(true))
+ .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
+ .period(60, TimeUnit.SECONDS)
+ .objectName(snitchMBean)
+ .operationName("getDatacenter")
+ .operationParams(ImmutableList.of(getBroadcastAddress()))
+ .onException(Functions.<String>constant(null))
+ .suppressDuplicates(true))
+ .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
+ .period(60, TimeUnit.SECONDS)
+ .objectName(snitchMBean)
+ .operationName("getRack")
+ .operationParams(ImmutableList.of(getBroadcastAddress()))
+ .onException(Functions.<String>constant(null))
+ .suppressDuplicates(true))
+ .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
+ .objectName(storageServiceMBean)
+ .attributeName("TokenToEndpointMap")
+ .onSuccess(new Function<Object, Integer>() {
+ @Override
+ public Integer apply(@Nullable Object arg) {
+ Map input = (Map)arg;
+ if (input == null || input.isEmpty()) return 0;
+ return input.size();
}
- return result;
- }})
- .onException(Functions.<Set<BigInteger>>constant(null))
- .suppressDuplicates(true))
- .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
- .period(60, TimeUnit.SECONDS)
- .objectName(snitchMBean)
- .operationName("getDatacenter")
- .operationParams(ImmutableList.of(getBroadcastAddress()))
- .onException(Functions.<String>constant(null))
- .suppressDuplicates(true))
- .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
- .period(60, TimeUnit.SECONDS)
- .objectName(snitchMBean)
- .operationName("getRack")
- .operationParams(ImmutableList.of(getBroadcastAddress()))
- .onException(Functions.<String>constant(null))
- .suppressDuplicates(true))
- .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
- .objectName(storageServiceMBean)
- .attributeName("TokenToEndpointMap")
- .onSuccess(new Function<Object, Integer>() {
- @Override
- public Integer apply(@Nullable Object arg) {
- Map input = (Map)arg;
- if (input == null || input.isEmpty()) return 0;
- return input.size();
- }
- })
- .onException(Functions.constant(-1)))
- .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
- .objectName(storageServiceMBean)
- .attributeName("LiveNodes")
- .onSuccess(new Function<Object, Integer>() {
- @Override
- public Integer apply(@Nullable Object arg) {
- List input = (List)arg;
- if (input == null || input.isEmpty()) return 0;
- return input.size();
- }
- })
- .onException(Functions.constant(-1)))
- .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
- .objectName(readStageMBean)
- .attributeName("ActiveCount")
- .onException(Functions.constant((Integer)null))
- .enabled(retrieveUsageMetrics))
- .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
- .objectName(readStageMBean)
- .attributeName("PendingTasks")
- .onException(Functions.constant((Long)null))
- .enabled(retrieveUsageMetrics))
- .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
- .objectName(readStageMBean)
- .attributeName("CompletedTasks")
- .onException(Functions.constant((Long)null))
- .enabled(retrieveUsageMetrics))
- .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
- .objectName(mutationStageMBean)
- .attributeName("ActiveCount")
- .onException(Functions.constant((Integer)null))
- .enabled(retrieveUsageMetrics))
- .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
- .objectName(mutationStageMBean)
- .attributeName("PendingTasks")
- .onException(Functions.constant((Long)null))
- .enabled(retrieveUsageMetrics))
- .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
- .objectName(mutationStageMBean)
- .attributeName("CompletedTasks")
- .onException(Functions.constant((Long)null))
- .enabled(retrieveUsageMetrics))
- .build();
-
- functionFeed = FunctionFeed.builder()
- .entity(this)
- .period(3000, TimeUnit.MILLISECONDS)
- .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
- .onException(Functions.constant(-1L))
- .callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
- .enabled(retrieveUsageMetrics))
- .build();
+ })
+ .onException(Functions.constant(-1)))
+ .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
+ .objectName(storageServiceMBean)
+ .attributeName("LiveNodes")
+ .onSuccess(new Function<Object, Integer>() {
+ @Override
+ public Integer apply(@Nullable Object arg) {
+ List input = (List)arg;
+ if (input == null || input.isEmpty()) return 0;
+ return input.size();
+ }
+ })
+ .onException(Functions.constant(-1)))
+ .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
+ .objectName(readStageMBean)
+ .attributeName("ActiveCount")
+ .onException(Functions.constant((Integer)null))
+ .enabled(retrieveUsageMetrics))
+ .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
+ .objectName(readStageMBean)
+ .attributeName("PendingTasks")
+ .onException(Functions.constant((Long)null))
+ .enabled(retrieveUsageMetrics))
+ .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
+ .objectName(readStageMBean)
+ .attributeName("CompletedTasks")
+ .onException(Functions.constant((Long)null))
+ .enabled(retrieveUsageMetrics))
+ .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
+ .objectName(mutationStageMBean)
+ .attributeName("ActiveCount")
+ .onException(Functions.constant((Integer)null))
+ .enabled(retrieveUsageMetrics))
+ .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
+ .objectName(mutationStageMBean)
+ .attributeName("PendingTasks")
+ .onException(Functions.constant((Long)null))
+ .enabled(retrieveUsageMetrics))
+ .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
+ .objectName(mutationStageMBean)
+ .attributeName("CompletedTasks")
+ .onException(Functions.constant((Long)null))
+ .enabled(retrieveUsageMetrics))
+ .build();
+
+ jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
+ }
- jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
+ if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
+ functionFeed = FunctionFeed.builder()
+ .entity(this)
+ .period(3000, TimeUnit.MILLISECONDS)
+ .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
+ .onException(Functions.constant(-1L))
+ .callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
+ .enabled(retrieveUsageMetrics))
+ .build();
+ }
connectServiceUpIsRunning();
}
@@ -530,15 +535,19 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN
}
// service-up checks
- enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
- .from(THRIFT_PORT_LATENCY)
- .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") )
- .build());
+ if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
+ enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+ .from(THRIFT_PORT_LATENCY)
+ .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") )
+ .build());
+ }
- enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
- .from(SERVICE_UP_JMX)
- .computing(Functionals.ifEquals(false).value("JMX reports not up") )
- .build());
+ if (Boolean.TRUE.equals(getConfig(UsesJmx.USE_JMX))) {
+ enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+ .from(SERVICE_UP_JMX)
+ .computing(Functionals.ifEquals(false).value("JMX reports not up") )
+ .build());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
index 3899c3b..c1b8b04 100644
--- a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
@@ -18,15 +18,23 @@
*/
package org.apache.brooklyn.entity.nosql.cassandra;
+import static org.testng.Assert.assertNotNull;
+
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
import org.apache.brooklyn.entity.AbstractEc2LiveTest;
import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
import org.apache.brooklyn.test.EntityTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest {
@@ -46,4 +54,28 @@ public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest {
AstyanaxSample astyanax = new AstyanaxSample(cassandra);
astyanax.astyanaxTest();
}
+
+ @Test(groups = {"Live"})
+ public void testWithOnlyPort22() throws Exception {
+ // CentOS-6.3-x86_64-GA-EBS-02-85586466-5b6c-4495-b580-14f72b4bcf51-ami-bb9af1d2.1
+ jcloudsLocation = mgmt.getLocationRegistry().resolve(LOCATION_SPEC, ImmutableMap.of(
+ "tags", ImmutableList.of(getClass().getName()),
+ "imageId", "us-east-1/ami-a96b01c0",
+ "hardwareId", SMALL_HARDWARE_ID));
+
+ CassandraNode server = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+ .configure(CassandraNode.PROVISIONING_PROPERTIES.subKey(CloudLocationConfig.INBOUND_PORTS.getName()), ImmutableList.of(22))
+ .configure(CassandraNode.USE_JMX, false)
+ .configure(CassandraNode.USE_THRIFT_MONITORING, false));
+
+ app.start(ImmutableList.of(jcloudsLocation));
+
+ EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_UP, true);
+ EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ Integer port = server.getAttribute(CassandraNode.THRIFT_PORT);
+ assertNotNull(port);
+
+ assertViaSshLocalPortListeningEventually(server, port);
+ }
}