You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/11/25 13:17:53 UTC

[08/13] incubator-brooklyn git commit: Cassandra: support disabling direct connection

Cassandra: support disabling direct connection


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/0904ed97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/0904ed97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/0904ed97

Branch: refs/heads/master
Commit: 0904ed972813e659d96b706b31333818f4bfae3e
Parents: d32693a
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 24 13:25:58 2015 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 24 13:53:47 2015 +0000

----------------------------------------------------------------------
 .../entity/nosql/cassandra/CassandraNode.java   |   3 +
 .../nosql/cassandra/CassandraNodeImpl.java      | 239 ++++++++++---------
 .../cassandra/CassandraNodeEc2LiveTest.java     |  32 +++
 3 files changed, 159 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
index 0f42be5..fb937ae 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
@@ -141,6 +141,9 @@ public interface CassandraNode extends DatastoreMixins.DatastoreCommon, Software
     BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>(
             new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens");
 
+    @SetFromFlag("useThriftMonitoring")
+    ConfigKey<Boolean> USE_THRIFT_MONITORING = ConfigKeys.newConfigKey("thriftMonitoring.enabled", "Thrift-port monitoring enabled", Boolean.TRUE);
+
     AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster");
 
     AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster");

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
index e08c99a..43bf4b2 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
@@ -46,6 +46,7 @@ import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.enricher.stock.Enrichers;
 import org.apache.brooklyn.entity.java.JavaAppUtils;
+import org.apache.brooklyn.entity.java.UsesJmx;
 import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
 import org.apache.brooklyn.feed.function.FunctionFeed;
 import org.apache.brooklyn.feed.function.FunctionPollConfig;
@@ -397,115 +398,119 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN
         jmxHelper = new JmxHelper(this);
         boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
         
-        jmxFeed = JmxFeed.builder()
-                .entity(this)
-                .period(3000, TimeUnit.MILLISECONDS)
-                .helper(jmxHelper)
-                .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
-                        .objectName(storageServiceMBean)
-                        .attributeName("Initialized")
-                        .onSuccess(Functions.forPredicate(Predicates.notNull()))
-                        .onException(Functions.constant(false))
-                        .suppressDuplicates(true))
-                .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
-                        .objectName(storageServiceMBean)
-                        .attributeName("TokenToEndpointMap")
-                        .onSuccess(new Function<Object, Set<BigInteger>>() {
-                            @Override
-                            public Set<BigInteger> apply(@Nullable Object arg) {
-                                Map input = (Map)arg;
-                                if (input == null || input.isEmpty()) return null;
-                                // FIXME does not work on aws-ec2, uses RFC1918 address
-                                Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
-                                Set<String> tokens = Maps.filterValues(input, self).keySet();
-                                Set<BigInteger> result = Sets.newLinkedHashSet();
-                                for (String token : tokens) {
-                                    result.add(new BigInteger(token));
+        if (getDriver().isJmxEnabled()) {
+            jmxFeed = JmxFeed.builder()
+                    .entity(this)
+                    .period(3000, TimeUnit.MILLISECONDS)
+                    .helper(jmxHelper)
+                    .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
+                            .objectName(storageServiceMBean)
+                            .attributeName("Initialized")
+                            .onSuccess(Functions.forPredicate(Predicates.notNull()))
+                            .onException(Functions.constant(false))
+                            .suppressDuplicates(true))
+                    .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
+                            .objectName(storageServiceMBean)
+                            .attributeName("TokenToEndpointMap")
+                            .onSuccess(new Function<Object, Set<BigInteger>>() {
+                                @Override
+                                public Set<BigInteger> apply(@Nullable Object arg) {
+                                    Map input = (Map)arg;
+                                    if (input == null || input.isEmpty()) return null;
+                                    // FIXME does not work on aws-ec2, uses RFC1918 address
+                                    Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
+                                    Set<String> tokens = Maps.filterValues(input, self).keySet();
+                                    Set<BigInteger> result = Sets.newLinkedHashSet();
+                                    for (String token : tokens) {
+                                        result.add(new BigInteger(token));
+                                    }
+                                    return result;
+                                }})
+                            .onException(Functions.<Set<BigInteger>>constant(null))
+                            .suppressDuplicates(true))
+                    .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
+                            .period(60, TimeUnit.SECONDS)
+                            .objectName(snitchMBean)
+                            .operationName("getDatacenter")
+                            .operationParams(ImmutableList.of(getBroadcastAddress()))
+                            .onException(Functions.<String>constant(null))
+                            .suppressDuplicates(true))
+                    .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
+                            .period(60, TimeUnit.SECONDS)
+                            .objectName(snitchMBean)
+                            .operationName("getRack")
+                            .operationParams(ImmutableList.of(getBroadcastAddress()))
+                            .onException(Functions.<String>constant(null))
+                            .suppressDuplicates(true))
+                    .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
+                            .objectName(storageServiceMBean)
+                            .attributeName("TokenToEndpointMap")
+                            .onSuccess(new Function<Object, Integer>() {
+                                @Override
+                                public Integer apply(@Nullable Object arg) {
+                                    Map input = (Map)arg;
+                                    if (input == null || input.isEmpty()) return 0;
+                                    return input.size();
                                 }
-                                return result;
-                            }})
-                        .onException(Functions.<Set<BigInteger>>constant(null))
-                        .suppressDuplicates(true))
-                .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
-                        .period(60, TimeUnit.SECONDS)
-                        .objectName(snitchMBean)
-                        .operationName("getDatacenter")
-                        .operationParams(ImmutableList.of(getBroadcastAddress()))
-                        .onException(Functions.<String>constant(null))
-                        .suppressDuplicates(true))
-                .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
-                        .period(60, TimeUnit.SECONDS)
-                        .objectName(snitchMBean)
-                        .operationName("getRack")
-                        .operationParams(ImmutableList.of(getBroadcastAddress()))
-                        .onException(Functions.<String>constant(null))
-                        .suppressDuplicates(true))
-                .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
-                        .objectName(storageServiceMBean)
-                        .attributeName("TokenToEndpointMap")
-                        .onSuccess(new Function<Object, Integer>() {
-                            @Override
-                            public Integer apply(@Nullable Object arg) {
-                                Map input = (Map)arg;
-                                if (input == null || input.isEmpty()) return 0;
-                                return input.size();
-                            }
-                        })
-                        .onException(Functions.constant(-1)))
-                .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
-                        .objectName(storageServiceMBean)
-                        .attributeName("LiveNodes")
-                        .onSuccess(new Function<Object, Integer>() {
-                            @Override
-                            public Integer apply(@Nullable Object arg) {
-                                List input = (List)arg;
-                                if (input == null || input.isEmpty()) return 0;
-                                return input.size();
-                            }
-                        })
-                        .onException(Functions.constant(-1)))
-                .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
-                        .objectName(readStageMBean)
-                        .attributeName("ActiveCount")
-                        .onException(Functions.constant((Integer)null))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
-                        .objectName(readStageMBean)
-                        .attributeName("PendingTasks")
-                        .onException(Functions.constant((Long)null))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
-                        .objectName(readStageMBean)
-                        .attributeName("CompletedTasks")
-                        .onException(Functions.constant((Long)null))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
-                        .objectName(mutationStageMBean)
-                        .attributeName("ActiveCount")
-                        .onException(Functions.constant((Integer)null))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
-                        .objectName(mutationStageMBean)
-                        .attributeName("PendingTasks")
-                        .onException(Functions.constant((Long)null))
-                        .enabled(retrieveUsageMetrics))
-                .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
-                        .objectName(mutationStageMBean)
-                        .attributeName("CompletedTasks")
-                        .onException(Functions.constant((Long)null))
-                        .enabled(retrieveUsageMetrics))
-                .build();
-        
-        functionFeed = FunctionFeed.builder()
-                .entity(this)
-                .period(3000, TimeUnit.MILLISECONDS)
-                .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
-                        .onException(Functions.constant(-1L))
-                        .callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
-                        .enabled(retrieveUsageMetrics))
-                .build();
+                            })
+                            .onException(Functions.constant(-1)))
+                    .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
+                            .objectName(storageServiceMBean)
+                            .attributeName("LiveNodes")
+                            .onSuccess(new Function<Object, Integer>() {
+                                @Override
+                                public Integer apply(@Nullable Object arg) {
+                                    List input = (List)arg;
+                                    if (input == null || input.isEmpty()) return 0;
+                                    return input.size();
+                                }
+                            })
+                            .onException(Functions.constant(-1)))
+                    .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
+                            .objectName(readStageMBean)
+                            .attributeName("ActiveCount")
+                            .onException(Functions.constant((Integer)null))
+                            .enabled(retrieveUsageMetrics))
+                    .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
+                            .objectName(readStageMBean)
+                            .attributeName("PendingTasks")
+                            .onException(Functions.constant((Long)null))
+                            .enabled(retrieveUsageMetrics))
+                    .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
+                            .objectName(readStageMBean)
+                            .attributeName("CompletedTasks")
+                            .onException(Functions.constant((Long)null))
+                            .enabled(retrieveUsageMetrics))
+                    .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
+                            .objectName(mutationStageMBean)
+                            .attributeName("ActiveCount")
+                            .onException(Functions.constant((Integer)null))
+                            .enabled(retrieveUsageMetrics))
+                    .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
+                            .objectName(mutationStageMBean)
+                            .attributeName("PendingTasks")
+                            .onException(Functions.constant((Long)null))
+                            .enabled(retrieveUsageMetrics))
+                    .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
+                            .objectName(mutationStageMBean)
+                            .attributeName("CompletedTasks")
+                            .onException(Functions.constant((Long)null))
+                            .enabled(retrieveUsageMetrics))
+                    .build();
+            
+            jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
+        }
         
-        jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
+        if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
+            functionFeed = FunctionFeed.builder()
+                    .entity(this)
+                    .period(3000, TimeUnit.MILLISECONDS)
+                    .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
+                            .onException(Functions.constant(-1L))
+                            .callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
+                            .enabled(retrieveUsageMetrics))
+                    .build();
+        }
         
         connectServiceUpIsRunning();
     }
@@ -530,15 +535,19 @@ public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraN
         }
         
         // service-up checks
-        enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
-                .from(THRIFT_PORT_LATENCY)
-                .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") )
-                .build());
+        if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
+            enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+                    .from(THRIFT_PORT_LATENCY)
+                    .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") )
+                    .build());
+        }
         
-        enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
-                .from(SERVICE_UP_JMX)
-                .computing(Functionals.ifEquals(false).value("JMX reports not up") )
-                .build());
+        if (Boolean.TRUE.equals(getConfig(UsesJmx.USE_JMX))) {
+            enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+                    .from(SERVICE_UP_JMX)
+                    .computing(Functionals.ifEquals(false).value("JMX reports not up") )
+                    .build());
+        }
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/0904ed97/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
index 3899c3b..c1b8b04 100644
--- a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
+++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java
@@ -18,15 +18,23 @@
  */
 package org.apache.brooklyn.entity.nosql.cassandra;
 
+import static org.testng.Assert.assertNotNull;
+
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
 import org.apache.brooklyn.entity.AbstractEc2LiveTest;
 import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
 import org.apache.brooklyn.test.EntityTestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest {
 
@@ -46,4 +54,28 @@ public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest {
         AstyanaxSample astyanax = new AstyanaxSample(cassandra);
         astyanax.astyanaxTest();
     }
+    
+    @Test(groups = {"Live"})
+    public void testWithOnlyPort22() throws Exception {
+        // CentOS-6.3-x86_64-GA-EBS-02-85586466-5b6c-4495-b580-14f72b4bcf51-ami-bb9af1d2.1
+        jcloudsLocation = mgmt.getLocationRegistry().resolve(LOCATION_SPEC, ImmutableMap.of(
+                "tags", ImmutableList.of(getClass().getName()),
+                "imageId", "us-east-1/ami-a96b01c0", 
+                "hardwareId", SMALL_HARDWARE_ID));
+
+        CassandraNode server = app.createAndManageChild(EntitySpec.create(CassandraNode.class)
+                .configure(CassandraNode.PROVISIONING_PROPERTIES.subKey(CloudLocationConfig.INBOUND_PORTS.getName()), ImmutableList.of(22))
+                .configure(CassandraNode.USE_JMX, false)
+                .configure(CassandraNode.USE_THRIFT_MONITORING, false));
+        
+        app.start(ImmutableList.of(jcloudsLocation));
+        
+        EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_UP, true);
+        EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        Integer port = server.getAttribute(CassandraNode.THRIFT_PORT);
+        assertNotNull(port);
+        
+        assertViaSshLocalPortListeningEventually(server, port);
+    }
 }