You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/07/09 16:45:07 UTC

[1/8] git commit: Added additional sensors

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 65bc06adc -> 2ebc307a3


Added additional sensors


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/62c7d20f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/62c7d20f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/62c7d20f

Branch: refs/heads/master
Commit: 62c7d20f2a1477fbe9291d491891eebb6e70c55f
Parents: b158aa3
Author: Martin Harris <gi...@nakomis.com>
Authored: Fri May 30 12:33:45 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Tue Jun 17 10:19:57 2014 +0100

----------------------------------------------------------------------
 .../nosql/elasticsearch/ElasticSearchNode.java  |  6 ++++
 .../elasticsearch/ElasticSearchNodeImpl.java    | 38 +++++++++++++++-----
 2 files changed, 36 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/62c7d20f/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
index 30d80e9..32ec9af 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
@@ -31,4 +31,10 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD
     AttributeSensor<String> NODE_ID = Sensors.newStringSensor("elasticsearch.node.id");
     AttributeSensor<String> NODE_NAME = Sensors.newStringSensor("elasticsearch.node.name");
     AttributeSensor<String> CLUSTER_NAME = Sensors.newStringSensor("elasticsearch.cluster.name");
+    AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.docs.count");
+    AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.store.bytes");
+    AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.get.total");
+    AttributeSensor<Integer> GET_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.get.time.in.millis");
+    AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.search.query.total");
+    AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.search.query.time.in.millis");
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/62c7d20f/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
index 92c809c..f25d0bf 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
@@ -7,6 +7,7 @@ import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
 import brooklyn.event.feed.http.JsonFunctions;
 import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.util.http.HttpToolResponse;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
@@ -37,15 +38,20 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
                 return input.getAsJsonObject().entrySet().iterator().next().getKey();
             }
         };
-        Function<JsonElement, JsonElement> getFirstNode = new Function<JsonElement, JsonElement>() {
+        
+        Function<JsonElement, JsonElement> getFirstNodeFromNodes = new Function<JsonElement, JsonElement>() {
             @Override public JsonElement apply(JsonElement input) {
                 return input.getAsJsonObject().entrySet().iterator().next().getValue();
             }
         };
+        
+        Function<HttpToolResponse, JsonElement> getFirstNode = HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), 
+            JsonFunctions.walk("nodes"), getFirstNodeFromNodes);
+                
         httpFeed = HttpFeed.builder()
             .entity(this)
             .period(1000)
-            .baseUri(String.format("http://%s:%s/_nodes/_local", hp.getHostText(), hp.getPort()))
+            .baseUri(String.format("http://%s:%s/_nodes/_local/stats", hp.getHostText(), hp.getPort()))
             .poll(new HttpPollConfig<Boolean>(SERVICE_UP)
                 .onSuccess(HttpValueFunctions.responseCodeEquals(200))
                 .onFailureOrException(Functions.constant(false)))
@@ -53,13 +59,29 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
                         .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getNodeId))
                         .onFailureOrException(Functions.constant("")))
             .poll(new HttpPollConfig<String>(NODE_NAME)
-                .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getFirstNode), 
-                        JsonFunctions.walk("name"), JsonFunctions.cast(String.class)))
-                .onFailureOrException(Functions.constant("")))
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("name"), JsonFunctions.cast(String.class)))
+                .onFailureOrException(Functions.<String>constant(null)))
             .poll(new HttpPollConfig<String>(CLUSTER_NAME)
-                .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getFirstNode), 
-                        JsonFunctions.walk("settings", "cluster", "name"), JsonFunctions.cast(String.class)))
-                .onFailureOrException(Functions.constant("")))
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("settings", "cluster", "name"), JsonFunctions.cast(String.class)))
+                .onFailureOrException(Functions.<String>constant(null)))
+            .poll(new HttpPollConfig<Integer>(DOCUMENT_COUNT)
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "docs", "count"), JsonFunctions.cast(Integer.class)))
+                .onFailureOrException(Functions.<Integer>constant(null)))
+            .poll(new HttpPollConfig<Integer>(STORE_BYTES)
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "store", "size_in_bytes"), JsonFunctions.cast(Integer.class)))
+                .onFailureOrException(Functions.<Integer>constant(null)))
+            .poll(new HttpPollConfig<Integer>(GET_TOTAL)
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "get", "total"), JsonFunctions.cast(Integer.class)))
+                .onFailureOrException(Functions.<Integer>constant(null)))
+            .poll(new HttpPollConfig<Integer>(GET_TIME_IN_MILLIS)
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "get", "time_in_millis"), JsonFunctions.cast(Integer.class)))
+                .onFailureOrException(Functions.<Integer>constant(null)))
+            .poll(new HttpPollConfig<Integer>(SEARCH_QUERY_TOTAL)
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "search", "query_total"), JsonFunctions.cast(Integer.class)))
+                .onFailureOrException(Functions.<Integer>constant(null)))
+            .poll(new HttpPollConfig<Integer>(SEARCH_QUERY_TIME_IN_MILLIS)
+                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "search", "query_time_in_millis"), JsonFunctions.cast(Integer.class)))
+                .onFailureOrException(Functions.<Integer>constant(null)))
             .build();
     }
     


[6/8] git commit: Updated following PR review

Posted by al...@apache.org.
Updated following PR review


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/6572c027
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/6572c027
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/6572c027

Branch: refs/heads/master
Commit: 6572c027b6e33dcded034113f19d9d4a28fe8fff
Parents: 20507af
Author: Martin Harris <gi...@nakomis.com>
Authored: Tue Jun 17 12:31:48 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Tue Jun 17 12:31:48 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchCluster.java     |  4 +-
 .../elasticsearch/ElasticSearchClusterImpl.java | 15 +---
 .../elasticsearch/ElasticSearchNodeImpl.java    | 92 +++++++++-----------
 .../ElasticSearchNodeSshDriver.java             |  2 +-
 4 files changed, 44 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
index fea777e..a026d17 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
@@ -2,9 +2,7 @@ package brooklyn.entity.nosql.elasticsearch;
 
 import brooklyn.entity.group.DynamicCluster;
 import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.event.basic.Sensors;
 import brooklyn.util.flags.SetFromFlag;
 
 /**
@@ -16,5 +14,5 @@ public interface ElasticSearchCluster extends DynamicCluster {
     BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, 
             "elasticsearch.cluster.name", "Name of the ElasticSearch cluster", "BrooklynCluster");
     
-    String getName();
+    String getClusterName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
index fe60c4a..32f8706 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
@@ -1,26 +1,15 @@
 package brooklyn.entity.nosql.elasticsearch;
 
-import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import brooklyn.entity.Entity;
 import brooklyn.entity.group.DynamicClusterImpl;
 import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.location.Location;
 
 public class ElasticSearchClusterImpl extends DynamicClusterImpl implements ElasticSearchCluster {
     
     private AtomicInteger nextMemberId = new AtomicInteger(0);
-    
-    public ElasticSearchClusterImpl() {
-        
-    }
-    
-    @Override
-    public void start(Collection<? extends Location> locations) {
-        super.start(locations);
-    }
-    
+
     @Override
     protected boolean calculateServiceUp() {
         boolean up = false;
@@ -42,7 +31,7 @@ public class ElasticSearchClusterImpl extends DynamicClusterImpl implements Elas
     }
     
     @Override
-    public String getName() {
+    public String getClusterName() {
         return getConfig(CLUSTER_NAME);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
index ec01539..e625aff 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
@@ -1,64 +1,66 @@
 package brooklyn.entity.nosql.elasticsearch;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.http.client.HttpClient;
-import org.bouncycastle.util.Strings;
-
-import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.AttributeSensor;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
 import brooklyn.event.feed.http.JsonFunctions;
 import brooklyn.location.access.BrooklynAccessUtils;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.http.HttpTool;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.guava.MaybeFunctions;
+import brooklyn.util.guava.TypeTokens;
 import brooklyn.util.http.HttpToolResponse;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.net.HostAndPort;
 import com.google.gson.JsonElement;
 
 public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements ElasticSearchNode {
     
-    HttpFeed httpFeed;
+    protected static final Function<Maybe<JsonElement>, Maybe<JsonElement>> GET_FIRST_NODE_FROM_NODES = new Function<Maybe<JsonElement>, Maybe<JsonElement>>() {
+        @Override public Maybe<JsonElement> apply(Maybe<JsonElement> input) {
+            if (input.isAbsent()) {
+                return input;
+            }
+            return Maybe.fromNullable(input.get().getAsJsonObject().entrySet().iterator().next().getValue());
+        }
+    };
     
-    public ElasticSearchNodeImpl() {
-        
-    }
+    protected static final Function<HttpToolResponse, Maybe<JsonElement>> GET_FIRST_NODE = Functionals.chain(HttpValueFunctions.jsonContents(), 
+            MaybeFunctions.<JsonElement>wrap(), JsonFunctions.walkM("nodes"), GET_FIRST_NODE_FROM_NODES);
+    
+    
+    HttpFeed httpFeed;
 
     @Override
     public Class<ElasticSearchNodeDriver> getDriverInterface() {
         return ElasticSearchNodeDriver.class;
     }
     
+    protected static final <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String... jsonPath) {
+        return new HttpPollConfig<T>(sensor)
+            .onSuccess(Functionals.chain(GET_FIRST_NODE, JsonFunctions.walkM(jsonPath), JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null)))
+            .onFailureOrException(Functions.<T>constant(null));
+    }
+    
     @Override
     protected void connectSensors() {
         super.connectSensors();
         Integer rawPort = getAttribute(HTTP_PORT);
         checkNotNull(rawPort, "HTTP_PORT sensors not set for %s; is an acceptable port available?", this);
         HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, rawPort);
-        Function<JsonElement, String> getNodeId = new Function<JsonElement, String>() {
-            @Override public String apply(JsonElement input) {
-                return input.getAsJsonObject().entrySet().iterator().next().getKey();
-            }
-        };
-        
-        Function<JsonElement, JsonElement> getFirstNodeFromNodes = new Function<JsonElement, JsonElement>() {
-            @Override public JsonElement apply(JsonElement input) {
-                return input.getAsJsonObject().entrySet().iterator().next().getValue();
+        Function<Maybe<JsonElement>, String> getNodeId = new Function<Maybe<JsonElement>, String>() {
+            @Override public String apply(Maybe<JsonElement> input) {
+                if (input.isAbsent()) {
+                    return null;
+                }
+                return input.get().getAsJsonObject().entrySet().iterator().next().getKey();
             }
         };
-        
-        Function<HttpToolResponse, JsonElement> getFirstNode = HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), 
-            JsonFunctions.walk("nodes"), getFirstNodeFromNodes);
-                
         httpFeed = HttpFeed.builder()
             .entity(this)
             .period(1000)
@@ -66,30 +68,16 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
             .poll(new HttpPollConfig<Boolean>(SERVICE_UP)
                 .onSuccess(HttpValueFunctions.responseCodeEquals(200))
                 .onFailureOrException(Functions.constant(false)))
-                .poll(new HttpPollConfig<String>(NODE_ID)
-                        .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getNodeId))
-                        .onFailureOrException(Functions.constant("")))
-            .poll(new HttpPollConfig<String>(NODE_NAME)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("name"), JsonFunctions.cast(String.class)))
-                .onFailureOrException(Functions.<String>constant(null)))
-            .poll(new HttpPollConfig<Integer>(DOCUMENT_COUNT)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "docs", "count"), JsonFunctions.cast(Integer.class)))
-                .onFailureOrException(Functions.<Integer>constant(null)))
-            .poll(new HttpPollConfig<Integer>(STORE_BYTES)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "store", "size_in_bytes"), JsonFunctions.cast(Integer.class)))
-                .onFailureOrException(Functions.<Integer>constant(null)))
-            .poll(new HttpPollConfig<Integer>(GET_TOTAL)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "get", "total"), JsonFunctions.cast(Integer.class)))
-                .onFailureOrException(Functions.<Integer>constant(null)))
-            .poll(new HttpPollConfig<Integer>(GET_TIME_IN_MILLIS)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "get", "time_in_millis"), JsonFunctions.cast(Integer.class)))
-                .onFailureOrException(Functions.<Integer>constant(null)))
-            .poll(new HttpPollConfig<Integer>(SEARCH_QUERY_TOTAL)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "search", "query_total"), JsonFunctions.cast(Integer.class)))
-                .onFailureOrException(Functions.<Integer>constant(null)))
-            .poll(new HttpPollConfig<Integer>(SEARCH_QUERY_TIME_IN_MILLIS)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "search", "query_time_in_millis"), JsonFunctions.cast(Integer.class)))
-                .onFailureOrException(Functions.<Integer>constant(null)))
+            .poll(new HttpPollConfig<String>(NODE_ID)
+                .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), MaybeFunctions.<JsonElement>wrap(), JsonFunctions.walkM("nodes"), getNodeId))
+                .onFailureOrException(Functions.constant("")))
+            .poll(getSensorFromNodeStat(NODE_NAME, "name"))
+            .poll(getSensorFromNodeStat(DOCUMENT_COUNT, "indices", "docs", "count"))
+            .poll(getSensorFromNodeStat(STORE_BYTES, "indices", "store", "size_in_bytes"))
+            .poll(getSensorFromNodeStat(GET_TOTAL, "indices", "get", "total"))
+            .poll(getSensorFromNodeStat(GET_TIME_IN_MILLIS, "indices", "get", "time_in_millis"))
+            .poll(getSensorFromNodeStat(SEARCH_QUERY_TOTAL, "indices", "search", "query_total"))
+            .poll(getSensorFromNodeStat(SEARCH_QUERY_TIME_IN_MILLIS, "indices", "search", "query_time_in_millis"))
             .poll(new HttpPollConfig<String>(CLUSTER_NAME)
                 .onSuccess(HttpValueFunctions.jsonContents("cluster_name", String.class)))
             .build();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
index 2dda090..748975b 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
@@ -32,8 +32,8 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
         String saveAs = resolver.getFilename();
         
         List<String> commands = ImmutableList.<String>builder()
-            .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
             .add(BashCommands.installJavaLatestOrFail())
+            .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
             .add(String.format("tar zxvf %s", saveAs))
             .build();
         


[3/8] git commit: Fixed (and simplified) clustering, using automatic zen discovery

Posted by al...@apache.org.
Fixed (and simplified) clustering, using automatic zen discovery


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/20507af0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/20507af0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/20507af0

Branch: refs/heads/master
Commit: 20507af0818ed93f48cee185fde64a8ac9a4a0b4
Parents: 587e699
Author: Martin Harris <gi...@nakomis.com>
Authored: Wed Jun 4 13:33:30 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Tue Jun 17 10:19:58 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchCluster.java     |  2 -
 .../elasticsearch/ElasticSearchClusterImpl.java | 55 --------------------
 .../nosql/elasticsearch/ElasticSearchNode.java  |  3 +-
 .../elasticsearch/ElasticSearchNodeImpl.java    | 15 ------
 4 files changed, 1 insertion(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/20507af0/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
index 4f6448f..fea777e 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
@@ -16,7 +16,5 @@ public interface ElasticSearchCluster extends DynamicCluster {
     BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, 
             "elasticsearch.cluster.name", "Name of the ElasticSearch cluster", "BrooklynCluster");
     
-    AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("elasticsearch.cluster.node.list", "Comma delimited list of nodes in hostname:port format");
-    
     String getName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/20507af0/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
index 5e1deb6..fe60c4a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
@@ -4,45 +4,24 @@ import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
 import brooklyn.entity.group.DynamicClusterImpl;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.location.Location;
-import brooklyn.policy.PolicySpec;
-import brooklyn.util.text.Strings;
 
 public class ElasticSearchClusterImpl extends DynamicClusterImpl implements ElasticSearchCluster {
     
     private AtomicInteger nextMemberId = new AtomicInteger(0);
-    private MemberTrackingPolicy policy;
     
     public ElasticSearchClusterImpl() {
         
     }
     
     @Override
-    public void init() {
-        policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
-                .displayName(getName() + " membership tracker")
-                .configure("group", this) 
-                .configure(AbstractMembershipTrackingPolicy.NOTIFY_ON_DUPLICATES, false));
-    }
-    
-    @Override
     public void start(Collection<? extends Location> locations) {
         super.start(locations);
     }
     
     @Override
-    public void stop() {
-        if (policy != null) {
-            removePolicy(policy);
-        }
-        super.stop();
-    }
-
-    @Override
     protected boolean calculateServiceUp() {
         boolean up = false;
         for (Entity member : getMembers()) {
@@ -57,8 +36,6 @@ public class ElasticSearchClusterImpl extends DynamicClusterImpl implements Elas
         EntitySpec<ElasticSearchNode> spec = (EntitySpec<ElasticSearchNode>)getConfig(MEMBER_SPEC, EntitySpec.create(ElasticSearchNode.class));
         
         spec.configure(ElasticSearchNode.CLUSTER_NAME, getConfig(ElasticSearchClusterImpl.CLUSTER_NAME))
-            .configure(ElasticSearchNode.MULTICAST_ENABLED, false)
-            .configure(ElasticSearchNode.UNICAST_ENABLED, false)
             .configure(ElasticSearchNode.NODE_NAME, "elasticsearch-" + nextMemberId.incrementAndGet());
         
         return spec;
@@ -69,36 +46,4 @@ public class ElasticSearchClusterImpl extends DynamicClusterImpl implements Elas
         return getConfig(CLUSTER_NAME);
     }
     
-    private void resetCluster() {
-        String nodeList = "";
-        for (Entity entity : getMembers()) {
-            nodeList += getHostAndPort(entity) + ",";
-        }
-        if (!nodeList.isEmpty()) {
-            for (Entity entity : getMembers()) {
-                String otherNodesList = Strings.removeFromEnd(nodeList.replace(getHostAndPort(entity) + ",", ""), ",");
-                if (!otherNodesList.isEmpty()) {
-                    ((ElasticSearchNode)entity).resetCluster(otherNodesList);
-                }
-            }
-            
-        }
-        setAttribute(NODE_LIST, Strings.removeFromEnd(nodeList, ","));
-    }
-    
-    private String getHostAndPort(Entity entity) {
-        return entity.getAttribute(Attributes.HOSTNAME) + ":" + entity.getAttribute(Attributes.HTTP_PORT);
-    }
-    
-    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
-        @Override protected void onEntityChange(Entity member) {
-            ((ElasticSearchClusterImpl)entity).resetCluster();
-        }
-        @Override protected void onEntityAdded(Entity member) {
-            ((ElasticSearchClusterImpl)entity).resetCluster();
-        }
-        @Override protected void onEntityRemoved(Entity member) {
-            ((ElasticSearchClusterImpl)entity).resetCluster();
-        }
-    };
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/20507af0/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
index e9ec525..718983a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
@@ -20,7 +20,7 @@ import brooklyn.util.flags.SetFromFlag;
 @ImplementedBy(ElasticSearchNodeImpl.class)
 public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasDatastoreUrl {
     @SetFromFlag("version")
-    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.0");
+    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.1");
     
     @SetFromFlag("downloadUrl")
     BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
@@ -63,5 +63,4 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD
     AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.node.search.query.total");
     AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.node.search.query.time.in.millis");
     
-    void resetCluster(String nodeList);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/20507af0/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
index fafe67f..ec01539 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
@@ -96,21 +96,6 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
     }
     
     @Override
-    public void resetCluster(String nodeList) {
-        URI updateClusterUri;
-        try {
-            updateClusterUri = new URI(String.format("http://%s:%s/_cluster/settings", getAttribute(Attributes.HOSTNAME), getAttribute(HTTP_PORT)));
-        } catch (URISyntaxException e) {
-            throw Exceptions.propagate(e);
-        }
-        HttpClient client = HttpTool.httpClientBuilder().build();
-        
-        String payload = String.format("{\"persistent\":{\"discovery.zen.ping.unicast.hosts\":\"%s\"}}", nodeList);
-        
-        HttpToolResponse result = HttpTool.httpPut(client, updateClusterUri, ImmutableMap.<String, String>of(), Strings.toByteArray(payload));
-    }
-    
-    @Override
     protected void disconnectSensors() {
         if (httpFeed != null) {
             httpFeed.stop();


[8/8] git commit: This closes #5

Posted by al...@apache.org.
This closes #5


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/2ebc307a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/2ebc307a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/2ebc307a

Branch: refs/heads/master
Commit: 2ebc307a34caa126a5b43e98bbef5e094bc2eb44
Parents: 65bc06a 25ef693
Author: Aled Sage <al...@gmail.com>
Authored: Wed Jul 9 15:42:15 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Jul 9 15:42:15 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchCluster.java     |  18 +++
 .../elasticsearch/ElasticSearchClusterImpl.java |  37 ++++++
 .../nosql/elasticsearch/ElasticSearchNode.java  |  66 ++++++++++
 .../elasticsearch/ElasticSearchNodeDriver.java  |   7 ++
 .../elasticsearch/ElasticSearchNodeImpl.java    |  92 ++++++++++++++
 .../ElasticSearchNodeSshDriver.java             | 119 +++++++++++++++++++
 .../ElasticSearchClusterIntegrationTest.java    | 102 ++++++++++++++++
 .../ElasticSearchNodeIntegrationTest.java       |  94 +++++++++++++++
 8 files changed, 535 insertions(+)
----------------------------------------------------------------------



[7/8] git commit: Updated following PR review, added integration tests

Posted by al...@apache.org.
Updated following PR review, added integration tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/25ef6932
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/25ef6932
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/25ef6932

Branch: refs/heads/master
Commit: 25ef69322f6a4e065d53e4064ef36c8fb75714d5
Parents: 6572c02
Author: Martin Harris <gi...@nakomis.com>
Authored: Fri Jun 20 13:54:53 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Fri Jun 20 13:54:53 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchClusterImpl.java |   3 +-
 .../ElasticSearchNodeSshDriver.java             |  18 ++--
 .../ElasticSearchClusterIntegrationTest.java    | 102 +++++++++++++++++++
 .../ElasticSearchNodeIntegrationTest.java       |  94 +++++++++++++++++
 4 files changed, 206 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/25ef6932/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
index 32f8706..dd2a7b4 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
@@ -21,8 +21,7 @@ public class ElasticSearchClusterImpl extends DynamicClusterImpl implements Elas
     
     @Override
     protected EntitySpec<?> getMemberSpec() {
-        @SuppressWarnings("unchecked")
-        EntitySpec<ElasticSearchNode> spec = (EntitySpec<ElasticSearchNode>)getConfig(MEMBER_SPEC, EntitySpec.create(ElasticSearchNode.class));
+        EntitySpec<?> spec = EntitySpec.create(getConfig(MEMBER_SPEC, EntitySpec.create(ElasticSearchNode.class)));
         
         spec.configure(ElasticSearchNode.CLUSTER_NAME, getConfig(ElasticSearchClusterImpl.CLUSTER_NAME))
             .configure(ElasticSearchNode.NODE_NAME, "elasticsearch-" + nextMemberId.incrementAndGet());

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/25ef6932/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
index 748975b..89e575e 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
@@ -67,12 +67,12 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
         if (entity.getConfig(ElasticSearchNode.TEMPLATE_CONFIGURATION_URL) != null) {
             commandBuilder.append(" -Des.config=" + Os.mergePaths(getRunDir(), getConfigFile()));
         }
-        appendConfigIfPresent(commandBuilder, ElasticSearchNode.DATA_DIR, "es.path.data", Os.mergePaths(getRunDir(), "data"));
-        appendConfigIfPresent(commandBuilder, ElasticSearchNode.LOG_DIR, "es.path.logs", Os.mergePaths(getRunDir(), "logs"));
-        appendConfigIfPresent(commandBuilder, ElasticSearchNode.NODE_NAME.getConfigKey(), "es.node.name");
-        appendConfigIfPresent(commandBuilder, ElasticSearchNode.CLUSTER_NAME.getConfigKey(), "es.cluster.name");
-        appendConfigIfPresent(commandBuilder, ElasticSearchNode.MULTICAST_ENABLED, "es.discovery.zen.ping.multicast.enabled");
-        appendConfigIfPresent(commandBuilder, ElasticSearchNode.UNICAST_ENABLED, "es.discovery.zen.ping.unicast.enabled");
+        appendConfigIfPresent(commandBuilder, "es.path.data", ElasticSearchNode.DATA_DIR, Os.mergePaths(getRunDir(), "data"));
+        appendConfigIfPresent(commandBuilder, "es.path.logs", ElasticSearchNode.LOG_DIR, Os.mergePaths(getRunDir(), "logs"));
+        appendConfigIfPresent(commandBuilder, "es.node.name", ElasticSearchNode.NODE_NAME.getConfigKey());
+        appendConfigIfPresent(commandBuilder, "es.cluster.name", ElasticSearchNode.CLUSTER_NAME.getConfigKey());
+        appendConfigIfPresent(commandBuilder, "es.discovery.zen.ping.multicast.enabled", ElasticSearchNode.MULTICAST_ENABLED);
+        appendConfigIfPresent(commandBuilder, "es.discovery.zen.ping.unicast.enabled", ElasticSearchNode.UNICAST_ENABLED);
         commandBuilder.append(" > out.log 2> err.log < /dev/null");
         newScript(MutableMap.of("usePidFile", false), LAUNCHING)
             .updateTaskAndFailOnNonZeroResultCode()
@@ -80,11 +80,11 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
             .execute();
     }
     
-    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter) {
-        appendConfigIfPresent(builder, configKey, parameter, null);
+    private void appendConfigIfPresent(StringBuilder builder, String parameter, ConfigKey<?> configKey) {
+        appendConfigIfPresent(builder, parameter, configKey, null);
     }
     
-    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter, String defaultValue) {
+    private void appendConfigIfPresent(StringBuilder builder, String parameter, ConfigKey<?> configKey, String defaultValue) {
         String config = null;
         if (entity.getConfig(configKey) != null) {
             config = String.valueOf(entity.getConfig(configKey));

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/25ef6932/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterIntegrationTest.java
new file mode 100644
index 0000000..4ec0260
--- /dev/null
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterIntegrationTest.java
@@ -0,0 +1,102 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import static org.testng.Assert.assertEquals;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.http.client.methods.HttpGet;
+import org.bouncycastle.util.Strings;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.location.Location;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.test.entity.TestApplication;
+import brooklyn.util.http.HttpTool;
+import brooklyn.util.http.HttpToolResponse;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class ElasticSearchClusterIntegrationTest {
+
+    protected TestApplication app;
+    protected Location testLocation;
+    protected ElasticSearchCluster elasticSearchCluster;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        testLocation = new LocalhostMachineProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() {
+        Entities.destroyAll(app.getManagementContext());
+    }
+    
+    @Test(groups = {"Integration"})
+    public void testStartupAndShutdown() {
+        elasticSearchCluster = app.createAndManageChild(EntitySpec.create(ElasticSearchCluster.class)
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+        app.start(ImmutableList.of(testLocation));
+        
+        EntityTestUtils.assertAttributeEqualsEventually(elasticSearchCluster, Startable.SERVICE_UP, true);
+        
+        elasticSearchCluster.stop();
+        
+        EntityTestUtils.assertAttributeEqualsEventually(elasticSearchCluster, Startable.SERVICE_UP, false);
+    }
+    
+    @Test(groups = {"Integration"})
+    public void testPutAndGet() throws URISyntaxException {
+        elasticSearchCluster = app.createAndManageChild(EntitySpec.create(ElasticSearchCluster.class)
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+        app.start(ImmutableList.of(testLocation));
+        
+        EntityTestUtils.assertAttributeEqualsEventually(elasticSearchCluster, Startable.SERVICE_UP, true);
+        
+        assertEquals(elasticSearchCluster.getMembers().size(), 3);
+        
+        ElasticSearchNode anyNode = (ElasticSearchNode)elasticSearchCluster.getMembers().iterator().next();
+        
+        String document = "{\"foo\" : \"bar\",\"baz\" : \"quux\"}";
+        
+        String putBaseUri = "http://" + anyNode.getAttribute(Attributes.HOSTNAME) + ":" + anyNode.getAttribute(Attributes.HTTP_PORT);
+        
+        HttpToolResponse putResponse = HttpTool.httpPut(
+                HttpTool.httpClientBuilder()
+                    .port(anyNode.getAttribute(Attributes.HTTP_PORT))
+                    .build(), 
+                new URI(putBaseUri + "/mydocuments/docs/1"), 
+                ImmutableMap.<String, String>of(), 
+                Strings.toByteArray(document)); 
+        assertEquals(putResponse.getResponseCode(), 201);
+        EntityTestUtils.assertAttributeEqualsEventually(anyNode, ElasticSearchNode.DOCUMENT_COUNT, 1);
+        
+        int totalDocumentCount = 0;
+        for (Entity entity : elasticSearchCluster.getMembers()) {
+            ElasticSearchNode node = (ElasticSearchNode)entity;
+            String getBaseUri = "http://" + node.getAttribute(Attributes.HOSTNAME) + ":" + node.getAttribute(Attributes.HTTP_PORT);
+            HttpToolResponse getResponse = HttpTool.execAndConsume(
+                    HttpTool.httpClientBuilder().build(),
+                    new HttpGet(getBaseUri + "/mydocuments/docs/1/_source"));
+            assertEquals(getResponse.getResponseCode(), 200);
+            assertEquals(HttpValueFunctions.jsonContents("foo", String.class).apply(getResponse), "bar");
+            
+            totalDocumentCount += node.getAttribute(ElasticSearchNode.DOCUMENT_COUNT);
+        }
+        assertEquals(totalDocumentCount, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/25ef6932/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeIntegrationTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeIntegrationTest.java
new file mode 100644
index 0000000..25b9522
--- /dev/null
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeIntegrationTest.java
@@ -0,0 +1,94 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import static org.testng.Assert.assertEquals;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.http.client.methods.HttpGet;
+import org.bouncycastle.util.Strings;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.location.Location;
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.test.entity.TestApplication;
+import brooklyn.util.http.HttpTool;
+import brooklyn.util.http.HttpToolResponse;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class ElasticSearchNodeIntegrationTest {
+    
+    protected TestApplication app;
+    protected Location testLocation;
+    protected ElasticSearchNode elasticSearchNode;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        testLocation = new LocalhostMachineProvisioningLocation();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() {
+        Entities.destroyAll(app.getManagementContext());
+    }
+    
+    @Test(groups = {"Integration"})
+    public void testStartupAndShutdown() {
+        elasticSearchNode = app.createAndManageChild(EntitySpec.create(ElasticSearchNode.class));
+        app.start(ImmutableList.of(testLocation));
+        
+        EntityTestUtils.assertAttributeEqualsEventually(elasticSearchNode, Startable.SERVICE_UP, true);
+        
+        elasticSearchNode.stop();
+        
+        EntityTestUtils.assertAttributeEqualsEventually(elasticSearchNode, Startable.SERVICE_UP, false);
+    }
+    
+    @Test(groups = {"Integration"})
+    public void testDocumentCount() throws URISyntaxException {
+        elasticSearchNode = app.createAndManageChild(EntitySpec.create(ElasticSearchNode.class));
+        app.start(ImmutableList.of(testLocation));
+        
+        EntityTestUtils.assertAttributeEqualsEventually(elasticSearchNode, Startable.SERVICE_UP, true);
+        
+        EntityTestUtils.assertAttributeEquals(elasticSearchNode, ElasticSearchNode.DOCUMENT_COUNT, 0);
+        
+        String baseUri = "http://" + elasticSearchNode.getAttribute(Attributes.HOSTNAME) + ":" + elasticSearchNode.getAttribute(Attributes.HTTP_PORT);
+        
+        HttpToolResponse pingResponse = HttpTool.execAndConsume(
+                HttpTool.httpClientBuilder().build(),
+                new HttpGet(baseUri));
+        assertEquals(pingResponse.getResponseCode(), 200);
+        
+        String document = "{\"foo\" : \"bar\",\"baz\" : \"quux\"}";
+        
+        HttpToolResponse putResponse = HttpTool.httpPut(
+                HttpTool.httpClientBuilder()
+                    .port(elasticSearchNode.getAttribute(Attributes.HTTP_PORT))
+                    .build(), 
+                new URI(baseUri + "/mydocuments/docs/1"), 
+                ImmutableMap.<String, String>of(), 
+                Strings.toByteArray(document)); 
+        assertEquals(putResponse.getResponseCode(), 201);
+        
+        HttpToolResponse getResponse = HttpTool.execAndConsume(
+                HttpTool.httpClientBuilder().build(),
+                new HttpGet(baseUri + "/mydocuments/docs/1/_source"));
+        assertEquals(getResponse.getResponseCode(), 200);
+        assertEquals(HttpValueFunctions.jsonContents("foo", String.class).apply(getResponse), "bar");
+        
+        EntityTestUtils.assertAttributeEqualsEventually(elasticSearchNode, ElasticSearchNode.DOCUMENT_COUNT, 1);
+    }
+}


[5/8] git commit: Added support for elastic search cluster

Posted by al...@apache.org.
Added support for elastic search cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/587e6999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/587e6999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/587e6999

Branch: refs/heads/master
Commit: 587e6999d12bdf71d53dc0cb553eb31927689e88
Parents: e409c68
Author: Martin Harris <gi...@nakomis.com>
Authored: Wed Jun 4 12:17:22 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Tue Jun 17 10:19:58 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchCluster.java     |  22 ++++
 .../elasticsearch/ElasticSearchClusterImpl.java | 104 +++++++++++++++++++
 .../nosql/elasticsearch/ElasticSearchNode.java  |  30 ++++--
 .../elasticsearch/ElasticSearchNodeImpl.java    |  29 +++++-
 .../ElasticSearchNodeSshDriver.java             |  12 ++-
 5 files changed, 181 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
new file mode 100644
index 0000000..4f6448f
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java
@@ -0,0 +1,22 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * A cluster of {@link ElasticSearchNode}s based on {@link DynamicCluster} which can be resized by a policy if required.
+ */
+@ImplementedBy(ElasticSearchClusterImpl.class)
+public interface ElasticSearchCluster extends DynamicCluster {
+    @SetFromFlag("clusterName")
+    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, 
+            "elasticsearch.cluster.name", "Name of the ElasticSearch cluster", "BrooklynCluster");
+    
+    AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("elasticsearch.cluster.node.list", "Comma delimited list of nodes in hostname:port format");
+    
+    String getName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
new file mode 100644
index 0000000..5e1deb6
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java
@@ -0,0 +1,104 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.location.Location;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.text.Strings;
+
+public class ElasticSearchClusterImpl extends DynamicClusterImpl implements ElasticSearchCluster {
+    
+    private AtomicInteger nextMemberId = new AtomicInteger(0);
+    private MemberTrackingPolicy policy;
+    
+    public ElasticSearchClusterImpl() {
+        
+    }
+    
+    @Override
+    public void init() {
+        policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName(getName() + " membership tracker")
+                .configure("group", this) 
+                .configure(AbstractMembershipTrackingPolicy.NOTIFY_ON_DUPLICATES, false));
+    }
+    
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        super.start(locations);
+    }
+    
+    @Override
+    public void stop() {
+        if (policy != null) {
+            removePolicy(policy);
+        }
+        super.stop();
+    }
+
+    @Override
+    protected boolean calculateServiceUp() {
+        boolean up = false;
+        for (Entity member : getMembers()) {
+            if (Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) up = true;
+        }
+        return up;
+    }
+    
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        @SuppressWarnings("unchecked")
+        EntitySpec<ElasticSearchNode> spec = (EntitySpec<ElasticSearchNode>)getConfig(MEMBER_SPEC, EntitySpec.create(ElasticSearchNode.class));
+        
+        spec.configure(ElasticSearchNode.CLUSTER_NAME, getConfig(ElasticSearchClusterImpl.CLUSTER_NAME))
+            .configure(ElasticSearchNode.MULTICAST_ENABLED, false)
+            .configure(ElasticSearchNode.UNICAST_ENABLED, false)
+            .configure(ElasticSearchNode.NODE_NAME, "elasticsearch-" + nextMemberId.incrementAndGet());
+        
+        return spec;
+    }
+    
+    @Override
+    public String getName() {
+        return getConfig(CLUSTER_NAME);
+    }
+    
+    private void resetCluster() {
+        String nodeList = "";
+        for (Entity entity : getMembers()) {
+            nodeList += getHostAndPort(entity) + ",";
+        }
+        if (!nodeList.isEmpty()) {
+            for (Entity entity : getMembers()) {
+                String otherNodesList = Strings.removeFromEnd(nodeList.replace(getHostAndPort(entity) + ",", ""), ",");
+                if (!otherNodesList.isEmpty()) {
+                    ((ElasticSearchNode)entity).resetCluster(otherNodesList);
+                }
+            }
+            
+        }
+        setAttribute(NODE_LIST, Strings.removeFromEnd(nodeList, ","));
+    }
+    
+    private String getHostAndPort(Entity entity) {
+        return entity.getAttribute(Attributes.HOSTNAME) + ":" + entity.getAttribute(Attributes.HTTP_PORT);
+    }
+    
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override protected void onEntityChange(Entity member) {
+            ((ElasticSearchClusterImpl)entity).resetCluster();
+        }
+        @Override protected void onEntityAdded(Entity member) {
+            ((ElasticSearchClusterImpl)entity).resetCluster();
+        }
+        @Override protected void onEntityRemoved(Entity member) {
+            ((ElasticSearchClusterImpl)entity).resetCluster();
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
index 1eb4fda..e9ec525 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
@@ -27,14 +27,22 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD
             SoftwareProcess.DOWNLOAD_URL, "https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${version}.tar.gz");
     
     @SetFromFlag("dataDir")
-    ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey("elasticsearch.data.dir", "Directory for writing data files", null);
+    ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey("elasticsearch.node.data.dir", "Directory for writing data files", null);
     
     @SetFromFlag("logDir")
-    ConfigKey<String> LOG_DIR = ConfigKeys.newStringConfigKey("elasticsearch.log.dir", "Directory for writing log files", null);
+    ConfigKey<String> LOG_DIR = ConfigKeys.newStringConfigKey("elasticsearch.node.log.dir", "Directory for writing log files", null);
     
     @SetFromFlag("configFileUrl")
     ConfigKey<String> TEMPLATE_CONFIGURATION_URL = ConfigKeys.newStringConfigKey(
-            "elasticsearch.template.configuration.url", "URL where the elasticsearch configuration file (in freemarker format) can be found");
+            "elasticsearch.node.template.configuration.url", "URL where the elasticsearch configuration file (in freemarker format) can be found", null);
+    
+    @SetFromFlag("multicastEnabled")
+    ConfigKey<Boolean> MULTICAST_ENABLED = ConfigKeys.newBooleanConfigKey("elasticsearch.node.multicast.enabled", 
+            "Indicates whether zen discovery multicast should be enabled for a node", null);
+    
+    @SetFromFlag("multicastEnabled")
+    ConfigKey<Boolean> UNICAST_ENABLED = ConfigKeys.newBooleanConfigKey("elasticsearch.node.UNicast.enabled", 
+            "Indicates whether zen discovery unicast should be enabled for a node", null);
     
     @SetFromFlag("httpPort")
     PortAttributeSensorAndConfigKey HTTP_PORT = new PortAttributeSensorAndConfigKey(WebAppServiceConstants.HTTP_PORT, PortRanges.fromString("9200+"));
@@ -44,14 +52,16 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD
             "Node name (or randomly selected if not set", null);
     
     @SetFromFlag("clusterName")
-    StringAttributeSensorAndConfigKey CLUSTER_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.cluster.name", 
+    StringAttributeSensorAndConfigKey CLUSTER_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.node.cluster.name", 
             "Cluster name (or elasticsearch selected if not set", null);
     
     AttributeSensor<String> NODE_ID = Sensors.newStringSensor("elasticsearch.node.id");
-    AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.docs.count");
-    AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.store.bytes");
-    AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.get.total");
-    AttributeSensor<Integer> GET_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.get.time.in.millis");
-    AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.search.query.total");
-    AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.search.query.time.in.millis");
+    AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.node.docs.count");
+    AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.node.store.bytes");
+    AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.node.get.total");
+    AttributeSensor<Integer> GET_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.node.get.time.in.millis");
+    AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.node.search.query.total");
+    AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.node.search.query.time.in.millis");
+    
+    void resetCluster(String nodeList);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
index 062e14b..fafe67f 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
@@ -1,16 +1,27 @@
 package brooklyn.entity.nosql.elasticsearch;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.http.client.HttpClient;
+import org.bouncycastle.util.Strings;
+
+import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
 import brooklyn.event.feed.http.JsonFunctions;
 import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.http.HttpTool;
 import brooklyn.util.http.HttpToolResponse;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.net.HostAndPort;
 import com.google.gson.JsonElement;
 
@@ -61,9 +72,6 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
             .poll(new HttpPollConfig<String>(NODE_NAME)
                 .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("name"), JsonFunctions.cast(String.class)))
                 .onFailureOrException(Functions.<String>constant(null)))
-            .poll(new HttpPollConfig<String>(CLUSTER_NAME)
-                .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("settings", "cluster", "name"), JsonFunctions.cast(String.class)))
-                .onFailureOrException(Functions.<String>constant(null)))
             .poll(new HttpPollConfig<Integer>(DOCUMENT_COUNT)
                 .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "docs", "count"), JsonFunctions.cast(Integer.class)))
                 .onFailureOrException(Functions.<Integer>constant(null)))
@@ -88,6 +96,21 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
     }
     
     @Override
+    public void resetCluster(String nodeList) {
+        URI updateClusterUri;
+        try {
+            updateClusterUri = new URI(String.format("http://%s:%s/_cluster/settings", getAttribute(Attributes.HOSTNAME), getAttribute(HTTP_PORT)));
+        } catch (URISyntaxException e) {
+            throw Exceptions.propagate(e);
+        }
+        HttpClient client = HttpTool.httpClientBuilder().build();
+        
+        String payload = String.format("{\"persistent\":{\"discovery.zen.ping.unicast.hosts\":\"%s\"}}", nodeList);
+        
+        HttpToolResponse result = HttpTool.httpPut(client, updateClusterUri, ImmutableMap.<String, String>of(), Strings.toByteArray(payload));
+    }
+    
+    @Override
     protected void disconnectSensors() {
         if (httpFeed != null) {
             httpFeed.stop();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
index 3a695cb..2dda090 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
@@ -33,6 +33,7 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
         
         List<String> commands = ImmutableList.<String>builder()
             .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+            .add(BashCommands.installJavaLatestOrFail())
             .add(String.format("tar zxvf %s", saveAs))
             .build();
         
@@ -70,6 +71,8 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
         appendConfigIfPresent(commandBuilder, ElasticSearchNode.LOG_DIR, "es.path.logs", Os.mergePaths(getRunDir(), "logs"));
         appendConfigIfPresent(commandBuilder, ElasticSearchNode.NODE_NAME.getConfigKey(), "es.node.name");
         appendConfigIfPresent(commandBuilder, ElasticSearchNode.CLUSTER_NAME.getConfigKey(), "es.cluster.name");
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.MULTICAST_ENABLED, "es.discovery.zen.ping.multicast.enabled");
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.UNICAST_ENABLED, "es.discovery.zen.ping.unicast.enabled");
         commandBuilder.append(" > out.log 2> err.log < /dev/null");
         newScript(MutableMap.of("usePidFile", false), LAUNCHING)
             .updateTaskAndFailOnNonZeroResultCode()
@@ -77,12 +80,15 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
             .execute();
     }
     
-    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter) {
+    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter) {
         appendConfigIfPresent(builder, configKey, parameter, null);
     }
     
-    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter, String defaultValue) {
-        String config = entity.getConfig(configKey);
+    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter, String defaultValue) {
+        String config = null;
+        if (entity.getConfig(configKey) != null) {
+            config = String.valueOf(entity.getConfig(configKey));
+        }
         if (config == null && defaultValue != null) {
             config = defaultValue;
         }


[2/8] git commit: Created basic elasticsearch node

Posted by al...@apache.org.
Created basic elasticsearch node


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/b158aa31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/b158aa31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/b158aa31

Branch: refs/heads/master
Commit: b158aa319be25be04c80acc442616c56fa74cdb5
Parents: c2faca4
Author: Martin Harris <gi...@nakomis.com>
Authored: Thu May 29 17:38:50 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Tue Jun 17 10:19:57 2014 +0100

----------------------------------------------------------------------
 .../nosql/elasticsearch/ElasticSearchNode.java  | 34 +++++++++
 .../elasticsearch/ElasticSearchNodeDriver.java  |  7 ++
 .../elasticsearch/ElasticSearchNodeImpl.java    | 72 ++++++++++++++++++++
 .../ElasticSearchNodeSshDriver.java             | 70 +++++++++++++++++++
 4 files changed, 183 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b158aa31/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
new file mode 100644
index 0000000..30d80e9
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
@@ -0,0 +1,34 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.database.DatastoreMixins;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.webapp.WebAppServiceConstants;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.location.basic.PortRanges;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents an ElasticSearch node
+ */
+@ImplementedBy(ElasticSearchNodeImpl.class)
+public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasDatastoreUrl {
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.0");
+    
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, "https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${version}.tar.gz");
+    
+    @SetFromFlag("httpPort")
+    PortAttributeSensorAndConfigKey HTTP_PORT = new PortAttributeSensorAndConfigKey(WebAppServiceConstants.HTTP_PORT, PortRanges.fromString("9200+"));
+    
+    AttributeSensor<String> NODE_ID = Sensors.newStringSensor("elasticsearch.node.id");
+    AttributeSensor<String> NODE_NAME = Sensors.newStringSensor("elasticsearch.node.name");
+    AttributeSensor<String> CLUSTER_NAME = Sensors.newStringSensor("elasticsearch.cluster.name");
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b158aa31/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeDriver.java
new file mode 100644
index 0000000..327640f
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeDriver.java
@@ -0,0 +1,7 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface ElasticSearchNodeDriver extends SoftwareProcessDriver {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b158aa31/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
new file mode 100644
index 0000000..92c809c
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
@@ -0,0 +1,72 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.feed.http.HttpFeed;
+import brooklyn.event.feed.http.HttpPollConfig;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
+import brooklyn.location.access.BrooklynAccessUtils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.net.HostAndPort;
+import com.google.gson.JsonElement;
+
+public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements ElasticSearchNode {
+    
+    HttpFeed httpFeed;
+    
+    public ElasticSearchNodeImpl() {
+        
+    }
+
+    @Override
+    public Class<ElasticSearchNodeDriver> getDriverInterface() {
+        return ElasticSearchNodeDriver.class;
+    }
+    
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+        Integer rawPort = getAttribute(HTTP_PORT);
+        checkNotNull(rawPort, "HTTP_PORT sensors not set for %s; is an acceptable port available?", this);
+        HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, rawPort);
+        Function<JsonElement, String> getNodeId = new Function<JsonElement, String>() {
+            @Override public String apply(JsonElement input) {
+                return input.getAsJsonObject().entrySet().iterator().next().getKey();
+            }
+        };
+        Function<JsonElement, JsonElement> getFirstNode = new Function<JsonElement, JsonElement>() {
+            @Override public JsonElement apply(JsonElement input) {
+                return input.getAsJsonObject().entrySet().iterator().next().getValue();
+            }
+        };
+        httpFeed = HttpFeed.builder()
+            .entity(this)
+            .period(1000)
+            .baseUri(String.format("http://%s:%s/_nodes/_local", hp.getHostText(), hp.getPort()))
+            .poll(new HttpPollConfig<Boolean>(SERVICE_UP)
+                .onSuccess(HttpValueFunctions.responseCodeEquals(200))
+                .onFailureOrException(Functions.constant(false)))
+                .poll(new HttpPollConfig<String>(NODE_ID)
+                        .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getNodeId))
+                        .onFailureOrException(Functions.constant("")))
+            .poll(new HttpPollConfig<String>(NODE_NAME)
+                .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getFirstNode), 
+                        JsonFunctions.walk("name"), JsonFunctions.cast(String.class)))
+                .onFailureOrException(Functions.constant("")))
+            .poll(new HttpPollConfig<String>(CLUSTER_NAME)
+                .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getFirstNode), 
+                        JsonFunctions.walk("settings", "cluster", "name"), JsonFunctions.cast(String.class)))
+                .onFailureOrException(Functions.constant("")))
+            .build();
+    }
+    
+    @Override
+    protected void disconnectSensors() {
+        if (httpFeed != null) {
+            httpFeed.stop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b158aa31/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
new file mode 100644
index 0000000..f3846ca
--- /dev/null
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
@@ -0,0 +1,70 @@
+package brooklyn.entity.nosql.elasticsearch;
+
+import static java.lang.String.format;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.drivers.downloads.DownloadResolver;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.ssh.BashCommands;
+
+public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver implements ElasticSearchNodeDriver {
+
+    public ElasticSearchNodeSshDriver(EntityLocal entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    public void install() {
+        DownloadResolver resolver = Entities.newDownloader(this);
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+        
+        List<String> commands = ImmutableList.<String>builder()
+            .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+            .add(String.format("tar zxvf %s", saveAs))
+            .build();
+        
+        newScript(INSTALLING).body.append(commands).execute();
+        
+        setExpandedInstallDir(getInstallDir() + "/" + resolver.getUnpackedDirectoryName(format("elasticsearch-%s", getVersion())));
+    }
+
+    @Override
+    public void customize() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void launch() {
+        String pidFile = getRunDir() + "/" + AbstractSoftwareProcessSshDriver.PID_FILENAME;
+        entity.setAttribute(ElasticSearchNode.PID_FILE, pidFile);
+        newScript(MutableMap.of("usePidFile", false), LAUNCHING)
+            .updateTaskAndFailOnNonZeroResultCode()
+            .body.append(String.format("%s/bin/elasticsearch -d -p %s > out.log 2> err.log < /dev/null", getExpandedInstallDir(), pidFile))
+            .execute();
+    }
+    
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of("usePidFile", true), CHECK_RUNNING).execute() == 0;
+    }
+    
+    @Override
+    public void stop() {
+        newScript(MutableMap.of("usePidFile", true), STOPPING).execute();
+    }
+    
+    @Override
+    public void kill() {
+        newScript(MutableMap.of("usePidFile", true), KILLING).execute();
+    }
+
+}


[4/8] git commit: Added config keys required for clustering

Posted by al...@apache.org.
Added config keys required for clustering


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e409c68d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e409c68d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e409c68d

Branch: refs/heads/master
Commit: e409c68dcbf3337a8f1e6adba9c87953eff9f691
Parents: 62c7d20
Author: Martin Harris <gi...@nakomis.com>
Authored: Fri May 30 14:52:54 2014 +0100
Committer: Martin Harris <gi...@nakomis.com>
Committed: Tue Jun 17 10:19:58 2014 +0100

----------------------------------------------------------------------
 .../nosql/elasticsearch/ElasticSearchNode.java  | 21 +++++++-
 .../elasticsearch/ElasticSearchNodeImpl.java    |  2 +
 .../ElasticSearchNodeSshDriver.java             | 51 ++++++++++++++++++--
 3 files changed, 68 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e409c68d/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
index 32ec9af..1eb4fda 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java
@@ -8,6 +8,7 @@ import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.entity.webapp.WebAppServiceConstants;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
 import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
 import brooklyn.event.basic.Sensors;
 import brooklyn.location.basic.PortRanges;
@@ -25,12 +26,28 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD
     BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
             SoftwareProcess.DOWNLOAD_URL, "https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${version}.tar.gz");
     
+    @SetFromFlag("dataDir")
+    ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey("elasticsearch.data.dir", "Directory for writing data files", null);
+    
+    @SetFromFlag("logDir")
+    ConfigKey<String> LOG_DIR = ConfigKeys.newStringConfigKey("elasticsearch.log.dir", "Directory for writing log files", null);
+    
+    @SetFromFlag("configFileUrl")
+    ConfigKey<String> TEMPLATE_CONFIGURATION_URL = ConfigKeys.newStringConfigKey(
+            "elasticsearch.template.configuration.url", "URL where the elasticsearch configuration file (in freemarker format) can be found");
+    
     @SetFromFlag("httpPort")
     PortAttributeSensorAndConfigKey HTTP_PORT = new PortAttributeSensorAndConfigKey(WebAppServiceConstants.HTTP_PORT, PortRanges.fromString("9200+"));
     
+    @SetFromFlag("nodeName")
+    StringAttributeSensorAndConfigKey NODE_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.node.name", 
+            "Node name (or randomly selected if not set", null);
+    
+    @SetFromFlag("clusterName")
+    StringAttributeSensorAndConfigKey CLUSTER_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.cluster.name", 
+            "Cluster name (or elasticsearch selected if not set", null);
+    
     AttributeSensor<String> NODE_ID = Sensors.newStringSensor("elasticsearch.node.id");
-    AttributeSensor<String> NODE_NAME = Sensors.newStringSensor("elasticsearch.node.name");
-    AttributeSensor<String> CLUSTER_NAME = Sensors.newStringSensor("elasticsearch.cluster.name");
     AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.docs.count");
     AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.store.bytes");
     AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.get.total");

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e409c68d/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
index f25d0bf..062e14b 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java
@@ -82,6 +82,8 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti
             .poll(new HttpPollConfig<Integer>(SEARCH_QUERY_TIME_IN_MILLIS)
                 .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "search", "query_time_in_millis"), JsonFunctions.cast(Integer.class)))
                 .onFailureOrException(Functions.<Integer>constant(null)))
+            .poll(new HttpPollConfig<String>(CLUSTER_NAME)
+                .onSuccess(HttpValueFunctions.jsonContents("cluster_name", String.class)))
             .build();
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e409c68d/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
index f3846ca..3a695cb 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java
@@ -2,18 +2,23 @@ package brooklyn.entity.nosql.elasticsearch;
 
 import static java.lang.String.format;
 
+import java.io.Reader;
+import java.io.StringReader;
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
-
+import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityLocal;
 import brooklyn.entity.drivers.downloads.DownloadResolver;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Urls;
+import brooklyn.util.os.Os;
 import brooklyn.util.ssh.BashCommands;
 
+import com.google.common.collect.ImmutableList;
+
 public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver implements ElasticSearchNodeDriver {
 
     public ElasticSearchNodeSshDriver(EntityLocal entity, SshMachineLocation machine) {
@@ -38,20 +43,58 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver
 
     @Override
     public void customize() {
-        // TODO Auto-generated method stub
+        newScript(CUSTOMIZING).execute();  //create the directory
+        
+        String configFileUrl = entity.getConfig(ElasticSearchNode.TEMPLATE_CONFIGURATION_URL);
+        
+        if (configFileUrl == null) {
+            return;
+        }
+
+        String configScriptContents = processTemplate(configFileUrl);
+        Reader configContents = new StringReader(configScriptContents);
 
+        getMachine().copyTo(configContents, Urls.mergePaths(getRunDir(), getConfigFile()));
     }
 
     @Override
     public void launch() {
         String pidFile = getRunDir() + "/" + AbstractSoftwareProcessSshDriver.PID_FILENAME;
         entity.setAttribute(ElasticSearchNode.PID_FILE, pidFile);
+        StringBuilder commandBuilder = new StringBuilder()
+            .append(String.format("%s/bin/elasticsearch -d -p %s", getExpandedInstallDir(), pidFile));
+        if (entity.getConfig(ElasticSearchNode.TEMPLATE_CONFIGURATION_URL) != null) {
+            commandBuilder.append(" -Des.config=" + Os.mergePaths(getRunDir(), getConfigFile()));
+        }
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.DATA_DIR, "es.path.data", Os.mergePaths(getRunDir(), "data"));
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.LOG_DIR, "es.path.logs", Os.mergePaths(getRunDir(), "logs"));
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.NODE_NAME.getConfigKey(), "es.node.name");
+        appendConfigIfPresent(commandBuilder, ElasticSearchNode.CLUSTER_NAME.getConfigKey(), "es.cluster.name");
+        commandBuilder.append(" > out.log 2> err.log < /dev/null");
         newScript(MutableMap.of("usePidFile", false), LAUNCHING)
             .updateTaskAndFailOnNonZeroResultCode()
-            .body.append(String.format("%s/bin/elasticsearch -d -p %s > out.log 2> err.log < /dev/null", getExpandedInstallDir(), pidFile))
+            .body.append(commandBuilder.toString())
             .execute();
     }
     
+    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter) {
+        appendConfigIfPresent(builder, configKey, parameter, null);
+    }
+    
+    private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter, String defaultValue) {
+        String config = entity.getConfig(configKey);
+        if (config == null && defaultValue != null) {
+            config = defaultValue;
+        }
+        if (config != null) {
+            builder.append(String.format(" -D%s=%s", parameter, config));
+        }
+    }
+    
+    public String getConfigFile() {
+        return "elasticsearch.yaml";
+    }
+    
     @Override
     public boolean isRunning() {
         return newScript(MutableMap.of("usePidFile", true), CHECK_RUNNING).execute() == 0;