You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/07/09 16:45:11 UTC

[5/8] git commit: Added support for elastic search cluster

Added support for elastic search cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/587e6999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/587e6999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/587e6999

Branch: refs/heads/master
Commit: 587e6999d12bdf71d53dc0cb553eb31927689e88
Parents: e409c68
Author: Martin Harris <gi...@nakomis.com>
Authored: Wed Jun 4 12:17:22 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Tue Jun 17 10:19:58 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchCluster.java     |  22 ++++
 .../elasticsearch/ElasticSearchClusterImpl.java | 104 +++++++++++++++++++
 .../nosql/elasticsearch/ElasticSearchNode.java  |  30 ++++--
 .../elasticsearch/ElasticSearchNodeImpl.java    |  29 +++++-
 .../ElasticSearchNodeSshDriver.java             |  12 ++-
 5 files changed, 181 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
new file mode 100644
index 0000000..4f6448f
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
@@ -0,0 +1,22 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * A cluster of {@link ElasticSearchNode}s based on {@link DynamicCluster} which can be resized by a policy if required.
+ */
+@ImplementedBy(ElasticSearchClusterImpl.class)
+public interface ElasticSearchCluster extends DynamicCluster {
+    @SetFromFlag("clusterName")
+    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, 
+            "elasticsearch.cluster.name", "Name of the ElasticSearch cluster", "BrooklynCluster");
+    
+    AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("elasticsearch.cluster.node.list", "Comma delimited list of nodes in hostname:port format");
+    
+    String getName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
new file mode 100644
index 0000000..5e1deb6
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
@@ -0,0 +1,104 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.location.Location;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.text.Strings;
+
+public class ElasticSearchClusterImpl extends DynamicClusterImpl implements ElasticSearchCluster {
+    
+    private AtomicInteger nextMemberId = new AtomicInteger(0);
+    private MemberTrackingPolicy policy;
+    
+    public ElasticSearchClusterImpl() {
+        
+    }
+    
+    @Override
+    public void init() {
+        policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName(getName() + " membership tracker")
+                .configure("group", this) 
+                .configure(AbstractMembershipTrackingPolicy.NOTIFY_ON_DUPLICATES, false));
+    }
+    
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        super.start(locations);
+    }
+    
+    @Override
+    public void stop() {
+        if (policy != null) {
+            removePolicy(policy);
+        }
+        super.stop();
+    }
+
+    @Override
+    protected boolean calculateServiceUp() {
+        boolean up = false;
+        for (Entity member : getMembers()) {
+            if (Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) up = true;
+        }
+        return up;
+    }
+    
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        @SuppressWarnings("unchecked")
+        EntitySpec<ElasticSearchNode> spec = (EntitySpec<ElasticSearchNode>)getConfig(MEMBER_SPEC, EntitySpec.create(ElasticSearchNode.class));
+        
+        spec.configure(ElasticSearchNode.CLUSTER_NAME, getConfig(ElasticSearchClusterImpl.CLUSTER_NAME))
+            .configure(ElasticSearchNode.MULTICAST_ENABLED, false)
+            .configure(ElasticSearchNode.UNICAST_ENABLED, false)
+            .configure(ElasticSearchNode.NODE_NAME, "elasticsearch-" + nextMemberId.incrementAndGet());
+        
+        return spec;
+    }
+    
+    @Override
+    public String getName() {
+        return getConfig(CLUSTER_NAME);
+    }
+    
+    private void resetCluster() {
+        String nodeList = "";
+        for (Entity entity : getMembers()) {
+            nodeList += getHostAndPort(entity) + ",";
+        }
+        if (!nodeList.isEmpty()) {
+            for (Entity entity : getMembers()) {
+                String otherNodesList = Strings.removeFromEnd(nodeList.replace(getHostAndPort(entity) + ",", ""), ",");
+                if (!otherNodesList.isEmpty()) {
+                    ((ElasticSearchNode)entity).resetCluster(otherNodesList);
+                }
+            }
+            
+        }
+        setAttribute(NODE_LIST, Strings.removeFromEnd(nodeList, ","));
+    }
+    
+    private String getHostAndPort(Entity entity) {
+        return entity.getAttribute(Attributes.HOSTNAME) + ":" + entity.getAttribute(Attributes.HTTP_PORT);
+    }
+    
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override protected void onEntityChange(Entity member) {
+            ((ElasticSearchClusterImpl)entity).resetCluster();
+        }
+        @Override protected void onEntityAdded(Entity member) {
+            ((ElasticSearchClusterImpl)entity).resetCluster();
+        }
+        @Override protected void onEntityRemoved(Entity member) {
+            ((ElasticSearchClusterImpl)entity).resetCluster();
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
index 1eb4fda..e9ec525 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
@@ -27,14 +27,22 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD
             SoftwareProcess.DOWNLOAD_URL, "https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${version}.tar.gz");
     
     @SetFromFlag("dataDir")
-    ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey("elasticsearch.data.dir", "Directory for writing data files", null);
+    ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey("elasticsearch.node.data.dir", "Directory for writing data files", null);
     
     @SetFromFlag("logDir")
-    ConfigKey<String> LOG_DIR = ConfigKeys.newStringConfigKey("elasticsearch.log.dir", "Directory for writing log files", null);
+    ConfigKey<String> LOG_DIR = ConfigKeys.newStringConfigKey("elasticsearch.node.log.dir", "Directory for writing log files", null);
     
     @SetFromFlag("configFileUrl")
     ConfigKey<String> TEMPLATE_CONFIGURATION_URL = ConfigKeys.newStringConfigKey(
-            "elasticsearch.template.configuration.url", "URL where the elasticsearch configuration file (in freemarker format) can be found");
+            "elasticsearch.node.template.configuration.url", "URL where the elasticsearch configuration file (in freemarker format) can be found", null);
+    
+    @SetFromFlag("multicastEnabled")
+    ConfigKey<Boolean> MULTICAST_ENABLED = ConfigKeys.newBooleanConfigKey("elasticsearch.node.multicast.enabled", 
+            "Indicates whether zen discovery multicast should be enabled for a node", null);
+    
+    @SetFromFlag("multicastEnabled")
+    ConfigKey<Boolean> UNICAST_ENABLED = ConfigKeys.newBooleanConfigKey("elasticsearch.node.UNicast.enabled", 
+            "Indicates whether zen discovery unicast should be enabled for a node", null);
     
     @SetFromFlag("httpPort")
     PortAttributeSensorAndConfigKey HTTP_PORT = new PortAttributeSensorAndConfigKey(WebAppServiceConstants.HTTP_PORT, PortRanges.fromString("9200+"));
@@ -44,14 +52,16 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD
             "Node name (or randomly selected if not set", null);
     
     @SetFromFlag("clusterName")
-    StringAttributeSensorAndConfigKey CLUSTER_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.cluster.name", 
+    StringAttributeSensorAndConfigKey CLUSTER_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.node.cluster.name", 
             "Cluster name (or elasticsearch selected if not set", null);
     
     AttributeSensor<String> NODE_ID = Sensors.newStringSensor("elasticsearch.node.id");
-    AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.docs.count");
-    AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.store.bytes");
-    AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.get.total");
-    AttributeSensor<Integer> GET_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.get.time.in.millis");
-    AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.search.query.total");
-    AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.search.query.time.in.millis");
+    AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.node.docs.count");
+    AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.node.store.bytes");
+    AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.node.get.total");
+    AttributeSensor<Integer> GET_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.node.get.time.in.millis");
+    AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.node.search.query.total");
+    AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.node.search.query.time.in.millis");
+    
+    void resetCluster(String nodeList);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
index 062e14b..fafe67f 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
@@ -1,16 +1,27 @@
 package brooklyn.entity.nosql.elasticsearch;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.http.client.HttpClient;
+import org.bouncycastle.util.Strings;
+
+import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
 import brooklyn.event.feed.http.JsonFunctions;
 import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.http.HttpTool;
 import brooklyn.util.http.HttpToolResponse;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.net.HostAndPort;
 import com.google.gson.JsonElement;
 
@@ -61,9 +72,6 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
             .poll(new HttpPollConfig<String>(NODE_NAME)
                 .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("name"), JsonFunctions.cast(String.class)))
                 .onFailureOrException(Functions.<String>constant(null)))
-            .poll(new HttpPollConfig<String>(CLUSTER_NAME)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("settings", "cluster", "name"), JsonFunctions.cast(String.class)))
-                .onFailureOrException(Functions.<String>constant(null)))
             .poll(new HttpPollConfig<Integer>(DOCUMENT_COUNT)
                 .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "docs", "count"), JsonFunctions.cast(Integer.class)))
                 .onFailureOrException(Functions.<Integer>constant(null)))
@@ -88,6 +96,21 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
     }
     
     @Override
+    public void resetCluster(String nodeList) {
+        URI updateClusterUri;
+        try {
+            updateClusterUri = new URI(String.format("http://%s:%s/_cluster/settings", getAttribute(Attributes.HOSTNAME), getAttribute(HTTP_PORT)));
+        } catch (URISyntaxException e) {
+            throw Exceptions.propagate(e);
+        }
+        HttpClient client = HttpTool.httpClientBuilder().build();
+        
+        String payload = String.format("{\"persistent\":{\"discovery.zen.ping.unicast.hosts\":\"%s\"}}", nodeList);
+        
+        HttpToolResponse result = HttpTool.httpPut(client, updateClusterUri, ImmutableMap.<String, String>of(), Strings.toByteArray(payload));
+    }
+    
+    @Override
     protected void disconnectSensors() {
         if (httpFeed != null) {
             httpFeed.stop();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
index 3a695cb..2dda090 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
@@ -33,6 +33,7 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
         
         List<String> commands = ImmutableList.<String>builder()
             .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+            .add(BashCommands.installJavaLatestOrFail())
             .add(String.format("tar zxvf %s", saveAs))
             .build();
         
@@ -70,6 +71,8 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
         appendConfigIfPresent(commandBuilder, ElasticSearchNode.LOG_DIR, "es.path.logs", Os.mergePaths(getRunDir(), "logs"));
         appendConfigIfPresent(commandBuilder, ElasticSearchNode.NODE_NAME.getConfigKey(), "es.node.name");
         appendConfigIfPresent(commandBuilder, ElasticSearchNode.CLUSTER_NAME.getConfigKey(), "es.cluster.name");
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.MULTICAST_ENABLED, "es.discovery.zen.ping.multicast.enabled");
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.UNICAST_ENABLED, "es.discovery.zen.ping.unicast.enabled");
         commandBuilder.append(" > out.log 2> err.log < /dev/null");
         newScript(MutableMap.of("usePidFile", false), LAUNCHING)
             .updateTaskAndFailOnNonZeroResultCode()
@@ -77,12 +80,15 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
             .execute();
     }
     
-    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter) {
+    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter) {
         appendConfigIfPresent(builder, configKey, parameter, null);
     }
     
-    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter, String defaultValue) {
-        String config = entity.getConfig(configKey);
+    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter, String defaultValue) {
+        String config = null;
+        if (entity.getConfig(configKey) != null) {
+            config = String.valueOf(entity.getConfig(configKey));
+        }
         if (config == null && defaultValue != null) {
             config = defaultValue;
         }