You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/29 23:04:51 UTC

zeppelin git commit: [ZEPPELIN-1821] Add HTTP client to elasticsearch interpreter

Repository: zeppelin
Updated Branches:
  refs/heads/master 940a8b7d3 -> e763b3bf3


[ZEPPELIN-1821] Add HTTP client to elasticsearch interpreter

### What is this PR for?
Add HTTP client to elasticsearch interpreter.

### What type of PR is it?
Feature

### Todos
* [X] - Source code
* [X] - Tests
* [X] - License
* [X] - Docs

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1821

### How should this be tested?
* Start an Elasticsearch node
* Configure the elasticsearch interpreter to use http
* Create queries in a note using elasticsearch

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? Yes
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes

Author: Bruno Bonnin <bb...@gmail.com>
Author: Bruno Bonnin <br...@myscript.com>

Closes #1902 from bbonnin/master and squashes the following commits:

f5a539e [Bruno Bonnin] Remove commented code lines
86153a8 [Bruno Bonnin] Merge remote-tracking branch 'upstream/master'
2e1bbbd [Bruno Bonnin] Merge remote-tracking branch 'upstream/master'
19e888e [Bruno Bonnin] Remove bad code in test
523d155 [Bruno Bonnin] Replace Java 8 methods
6bcf369 [Bruno Bonnin] Fix issue with id containing special chars (/, #)
4e9812e [Bruno Bonnin] Merge elasticsearch/pom.xml
5a96ae0 [Bruno Bonnin] Merge branch 'master' into master
e2365fb [Bruno Bonnin] Update elasticsearch/pom.xml
28b9805 [Bruno Bonnin] Update img
549db39 [Bruno Bonnin] Add HTTP client to elasticsearch interpreter
f4c5ac3 [Bruno Bonnin] HTTP-based Elasticsearch client


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e763b3bf
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e763b3bf
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e763b3bf

Branch: refs/heads/master
Commit: e763b3bf3e8a26a2e2134bc615aac1bff59cd82d
Parents: 940a8b7
Author: Bruno Bonnin <bb...@gmail.com>
Authored: Sat Jan 28 10:16:43 2017 +0100
Committer: Lee moon soo <mo...@apache.org>
Committed: Mon Jan 30 08:04:49 2017 +0900

----------------------------------------------------------------------
 .../img/docs-img/elasticsearch-config.png       | Bin 150536 -> 55656 bytes
 docs/interpreter/elasticsearch.md               |  17 +-
 elasticsearch/pom.xml                           |  15 +-
 .../elasticsearch/ElasticsearchInterpreter.java | 370 ++++++++++--------
 .../elasticsearch/action/ActionException.java   |  32 ++
 .../elasticsearch/action/ActionResponse.java    |  78 ++++
 .../elasticsearch/action/AggWrapper.java        |  43 +++
 .../elasticsearch/action/HitWrapper.java        |  67 ++++
 .../client/ElasticsearchClient.java             |  36 ++
 .../elasticsearch/client/HttpBasedClient.java   | 372 +++++++++++++++++++
 .../client/TransportBasedClient.java            | 235 ++++++++++++
 .../src/main/resources/interpreter-setting.json |  18 +
 .../ElasticsearchInterpreterTest.java           | 183 ++++++---
 zeppelin-distribution/src/bin_license/LICENSE   |   2 +
 14 files changed, 1251 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png
index b5f7dda..54a634a 100644
Binary files a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png and b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/docs/interpreter/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/elasticsearch.md b/docs/interpreter/elasticsearch.md
index 7f3fb27..165116b 100644
--- a/docs/interpreter/elasticsearch.md
+++ b/docs/interpreter/elasticsearch.md
@@ -46,7 +46,22 @@ limitations under the License.
   <tr>
     <td>elasticsearch.port</td>
     <td>9300</td>
-    <td>Connection port <b>( Important: this is not the HTTP port, but the transport port )</b></td>
+    <td>Connection port <b>( Important: it depends on the client type, transport or http)</b></td>
+  </tr>
+  <tr>
+    <td>elasticsearch.client.type</td>
+    <td>transport</td>
+    <td>The type of client for Elasticsearch (transport or http)<b>( Important: the port depends on this value)</b></td>
+  </tr>
+  <tr>
+    <td>elasticsearch.basicauth.username</td>
+    <td></td>
+    <td>Username for a basic authentication (http)</b></td>
+  </tr>
+  <tr>
+    <td>elasticsearch.basicauth.password</td>
+    <td></td>
+    <td>Password for a basic authentication (http)</b></td>
   </tr>
   <tr>
     <td>elasticsearch.result.size</td>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 6f09c7d..6042a14 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -26,7 +26,6 @@
     <relativePath>..</relativePath>
   </parent>
 
-  <groupId>org.apache.zeppelin</groupId>
   <artifactId>zeppelin-elasticsearch</artifactId>
   <packaging>jar</packaging>
   <version>0.8.0-SNAPSHOT</version>
@@ -34,8 +33,10 @@
 
   <properties>
     <elasticsearch.version>2.4.3</elasticsearch.version>
+    <httpasyncclient.version>4.0.2</httpasyncclient.version>
     <guava.version>18.0</guava.version>
     <json-flattener.version>0.1.6</json-flattener.version>
+    <unirest.version>1.4.9</unirest.version>
   </properties>
 
   <dependencies>
@@ -51,6 +52,12 @@
       <artifactId>elasticsearch</artifactId>
       <version>${elasticsearch.version}</version>
     </dependency>
+    
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpasyncclient</artifactId>
+      <version>${httpasyncclient.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>com.google.guava</groupId>
@@ -65,6 +72,12 @@
     </dependency>
     
     <dependency>
+      <groupId>com.mashape.unirest</groupId>
+      <artifactId>unirest-java</artifactId>
+      <version>${unirest.version}</version>
+    </dependency>
+    
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
index 549b5f2..e3918e4 100644
--- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
@@ -18,7 +18,6 @@
 package org.apache.zeppelin.elasticsearch;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,27 +32,20 @@ import java.util.TreeSet;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper;
+import org.apache.zeppelin.elasticsearch.action.HitWrapper;
+import org.apache.zeppelin.elasticsearch.client.ElasticsearchClient;
+import org.apache.zeppelin.elasticsearch.client.HttpBasedClient;
+import org.apache.zeppelin.elasticsearch.client.TransportBasedClient;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHitField;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@@ -66,7 +58,7 @@ import org.slf4j.LoggerFactory;
 import com.github.wnameless.json.flattener.JsonFlattener;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParseException;
+import com.google.gson.JsonObject;
 
 
 /**
@@ -77,75 +69,82 @@ public class ElasticsearchInterpreter extends Interpreter {
   private static Logger logger = LoggerFactory.getLogger(ElasticsearchInterpreter.class);
 
   private static final String HELP = "Elasticsearch interpreter:\n"
-    + "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
-    + "  - indices: list of indices separated by commas (depends on the command)\n"
-    + "  - types: list of document types separated by commas (depends on the command)\n"
-    + "Commands:\n"
-    + "  - search /indices/types <query>\n"
-    + "    . indices and types can be omitted (at least, you have to provide '/')\n"
-    + "    . a query is either a JSON-formatted query, nor a lucene query\n"
-    + "  - size <value>\n"
-    + "    . defines the size of the result set (default value is in the config)\n"
-    + "    . if used, this command must be declared before a search command\n"
-    + "  - count /indices/types <query>\n"
-    + "    . same comments as for the search\n"
-    + "  - get /index/type/id\n"
-    + "  - delete /index/type/id\n"
-    + "  - index /ndex/type/id <json-formatted document>\n"
-    + "    . the id can be omitted, elasticsearch will generate one";
+      + "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
+      + "  - indices: list of indices separated by commas (depends on the command)\n"
+      + "  - types: list of document types separated by commas (depends on the command)\n"
+      + "Commands:\n"
+      + "  - search /indices/types <query>\n"
+      + "    . indices and types can be omitted (at least, you have to provide '/')\n"
+      + "    . a query is either a JSON-formatted query, nor a lucene query\n"
+      + "  - size <value>\n"
+      + "    . defines the size of the result set (default value is in the config)\n"
+      + "    . if used, this command must be declared before a search command\n"
+      + "  - count /indices/types <query>\n"
+      + "    . same comments as for the search\n"
+      + "  - get /index/type/id\n"
+      + "  - delete /index/type/id\n"
+      + "  - index /ndex/type/id <json-formatted document>\n"
+      + "    . the id can be omitted, elasticsearch will generate one";
 
   protected static final List<String> COMMANDS = Arrays.asList(
-    "count", "delete", "get", "help", "index", "search");
+      "count", "delete", "get", "help", "index", "search");
 
   private static final Pattern FIELD_NAME_PATTERN = Pattern.compile("\\[\\\\\"(.+)\\\\\"\\](.*)");
 
 
   public static final String ELASTICSEARCH_HOST = "elasticsearch.host";
   public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
+  public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
   public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name";
   public static final String ELASTICSEARCH_RESULT_SIZE = "elasticsearch.result.size";
+  public static final String ELASTICSEARCH_BASIC_AUTH_USERNAME = "elasticsearch.basicauth.username";
+  public static final String ELASTICSEARCH_BASIC_AUTH_PASSWORD = "elasticsearch.basicauth.password";
 
   private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
-  private Client client;
-  private String host = "localhost";
-  private int port = 9300;
-  private String clusterName = "elasticsearch";
+  private ElasticsearchClient elsClient;
   private int resultSize = 10;
 
   public ElasticsearchInterpreter(Properties property) {
     super(property);
-    this.host = getProperty(ELASTICSEARCH_HOST);
-    this.port = Integer.parseInt(getProperty(ELASTICSEARCH_PORT));
-    this.clusterName = getProperty(ELASTICSEARCH_CLUSTER_NAME);
+
+  }
+
+  @Override
+  public void open() {
+    logger.info("Properties: {}", getProperty());
+
+    String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE);
+    clientType = clientType == null ? null : clientType.toLowerCase();
+
     try {
       this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE));
-    } catch (NumberFormatException e) {
+    }
+    catch (final NumberFormatException e) {
       this.resultSize = 10;
       logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
-        property.get(ELASTICSEARCH_RESULT_SIZE), e);
+          property.get(ELASTICSEARCH_RESULT_SIZE), e);
     }
-  }
 
-  @Override
-  public void open() {
     try {
-      logger.info("prop={}", getProperty());
-      final Settings settings = Settings.settingsBuilder()
-        .put("cluster.name", clusterName)
-        .put(getProperty())
-        .build();
-      client = TransportClient.builder().settings(settings).build()
-        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
-    }
-    catch (IOException e) {
+      if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) {
+        elsClient = new TransportBasedClient(getProperty());
+      }
+      else if ("http".equals(clientType)) {
+        elsClient = new HttpBasedClient(getProperty());
+      }
+      else {
+        logger.error("Unknown type of Elasticsearch client: " + clientType);
+      }
+    }
+    catch (final IOException e) {
       logger.error("Open connection with Elasticsearch", e);
     }
   }
 
   @Override
   public void close() {
-    if (client != null) {
-      client.close();
+    if (elsClient != null) {
+      elsClient.close();
     }
   }
 
@@ -159,7 +158,7 @@ public class ElasticsearchInterpreter extends Interpreter {
 
     int currentResultSize = resultSize;
 
-    if (client == null) {
+    if (elsClient == null) {
       return new InterpreterResult(InterpreterResult.Code.ERROR,
         "Problem with the Elasticsearch client, please check your configuration (host, port,...)");
     }
@@ -178,7 +177,7 @@ public class ElasticsearchInterpreter extends Interpreter {
 
       if (lines.length < 2) {
         return processHelp(InterpreterResult.Code.ERROR,
-                           "Size cmd must be followed by a search");
+            "Size cmd must be followed by a search");
       }
 
       final String[] sizeLine = StringUtils.split(lines[0], " ", 2);
@@ -202,13 +201,13 @@ public class ElasticsearchInterpreter extends Interpreter {
 
     try {
       if ("get".equalsIgnoreCase(method)) {
-        return processGet(urlItems);
+        return processGet(urlItems, interpreterContext);
       }
       else if ("count".equalsIgnoreCase(method)) {
-        return processCount(urlItems, data);
+        return processCount(urlItems, data, interpreterContext);
       }
       else if ("search".equalsIgnoreCase(method)) {
-        return processSearch(urlItems, data, currentResultSize);
+        return processSearch(urlItems, data, currentResultSize, interpreterContext);
       }
       else if ("index".equalsIgnoreCase(method)) {
         return processIndex(urlItems, data);
@@ -219,7 +218,7 @@ public class ElasticsearchInterpreter extends Interpreter {
 
       return processHelp(InterpreterResult.Code.ERROR, "Unknown command");
     }
-    catch (Exception e) {
+    catch (final Exception e) {
       return new InterpreterResult(InterpreterResult.Code.ERROR, "Error : " + e.getMessage());
     }
   }
@@ -243,7 +242,7 @@ public class ElasticsearchInterpreter extends Interpreter {
   public List<InterpreterCompletion> completion(String s, int i) {
     final List suggestions = new ArrayList<>();
 
-    for (String cmd : COMMANDS) {
+    for (final String cmd : COMMANDS) {
       if (cmd.toLowerCase().contains(s)) {
         suggestions.add(new InterpreterCompletion(cmd, cmd));
       }
@@ -251,6 +250,31 @@ public class ElasticsearchInterpreter extends Interpreter {
     return suggestions;
   }
 
+  private void addAngularObject(InterpreterContext interpreterContext, String prefix, Object obj) {
+    interpreterContext.getAngularObjectRegistry().add(
+        prefix + "_" + interpreterContext.getParagraphId().replace("-", "_"),
+        obj, null, null);
+  }
+
+  private String[] getIndexTypeId(String[] urlItems) {
+
+    if (urlItems.length < 3) {
+      return null;
+    }
+
+    final String index = urlItems[0];
+    final String type = urlItems[1];
+    final String id = StringUtils.join(Arrays.copyOfRange(urlItems, 2, urlItems.length), '/');
+
+    if (StringUtils.isEmpty(index)
+        || StringUtils.isEmpty(type)
+        || StringUtils.isEmpty(id)) {
+      return null;
+    }
+
+    return new String[] { index, type, id };
+  }
+
   private InterpreterResult processHelp(InterpreterResult.Code code, String additionalMessage) {
     final StringBuffer buffer = new StringBuffer();
 
@@ -267,28 +291,30 @@ public class ElasticsearchInterpreter extends Interpreter {
    * Processes a "get" request.
    *
    * @param urlItems Items of the URL
+   * @param interpreterContext Instance of the context
    * @return Result of the get request, it contains a JSON-formatted string
    */
-  private InterpreterResult processGet(String[] urlItems) {
+  private InterpreterResult processGet(String[] urlItems, InterpreterContext interpreterContext) {
+
+    final String[] indexTypeId = getIndexTypeId(urlItems);
 
-    if (urlItems.length != 3
-        || StringUtils.isEmpty(urlItems[0])
-        || StringUtils.isEmpty(urlItems[1])
-        || StringUtils.isEmpty(urlItems[2])) {
+    if (indexTypeId == null) {
       return new InterpreterResult(InterpreterResult.Code.ERROR,
-                                   "Bad URL (it should be /index/type/id)");
+          "Bad URL (it should be /index/type/id)");
     }
 
-    final GetResponse response = client
-      .prepareGet(urlItems[0], urlItems[1], urlItems[2])
-      .get();
-    if (response.isExists()) {
-      final String json = gson.toJson(response.getSource());
+    final ActionResponse response = elsClient.get(indexTypeId[0], indexTypeId[1], indexTypeId[2]);
+
+    if (response.isSucceeded()) {
+      final JsonObject json = response.getHit().getSourceAsJsonObject();
+      final String jsonStr = gson.toJson(json);
+
+      addAngularObject(interpreterContext, "get", json);
 
       return new InterpreterResult(
-                    InterpreterResult.Code.SUCCESS,
-                    InterpreterResult.Type.TEXT,
-                    json);
+          InterpreterResult.Code.SUCCESS,
+          InterpreterResult.Type.TEXT,
+          jsonStr);
     }
 
     return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
@@ -299,21 +325,25 @@ public class ElasticsearchInterpreter extends Interpreter {
    *
    * @param urlItems Items of the URL
    * @param data May contains the JSON of the request
+   * @param interpreterContext Instance of the context
    * @return Result of the count request, it contains the total hits
    */
-  private InterpreterResult processCount(String[] urlItems, String data) {
+  private InterpreterResult processCount(String[] urlItems, String data,
+      InterpreterContext interpreterContext) {
 
     if (urlItems.length > 2) {
       return new InterpreterResult(InterpreterResult.Code.ERROR,
-                                   "Bad URL (it should be /index1,index2,.../type1,type2,...)");
+          "Bad URL (it should be /index1,index2,.../type1,type2,...)");
     }
 
-    final SearchResponse response = searchData(urlItems, data, 0);
+    final ActionResponse response = searchData(urlItems, data, 0);
+
+    addAngularObject(interpreterContext, "count", response.getTotalHits());
 
     return new InterpreterResult(
-      InterpreterResult.Code.SUCCESS,
-      InterpreterResult.Type.TEXT,
-      "" + response.getHits().getTotalHits());
+        InterpreterResult.Code.SUCCESS,
+        InterpreterResult.Type.TEXT,
+        "" + response.getTotalHits());
   }
 
   /**
@@ -322,16 +352,22 @@ public class ElasticsearchInterpreter extends Interpreter {
    * @param urlItems Items of the URL
    * @param data May contains the JSON of the request
    * @param size Limit of result set
+   * @param interpreterContext Instance of the context
    * @return Result of the search request, it contains a tab-formatted string of the matching hits
    */
-  private InterpreterResult processSearch(String[] urlItems, String data, int size) {
+  private InterpreterResult processSearch(String[] urlItems, String data, int size,
+      InterpreterContext interpreterContext) {
 
     if (urlItems.length > 2) {
       return new InterpreterResult(InterpreterResult.Code.ERROR,
-                                   "Bad URL (it should be /index1,index2,.../type1,type2,...)");
+          "Bad URL (it should be /index1,index2,.../type1,type2,...)");
     }
 
-    final SearchResponse response = searchData(urlItems, data, size);
+    final ActionResponse response = searchData(urlItems, data, size);
+
+    addAngularObject(interpreterContext, "search",
+        (response.getAggregations() != null && response.getAggregations().size() > 0) ?
+            response.getAggregations() : response.getHits());
 
     return buildResponseMessage(response);
   }
@@ -347,18 +383,16 @@ public class ElasticsearchInterpreter extends Interpreter {
 
     if (urlItems.length < 2 || urlItems.length > 3) {
       return new InterpreterResult(InterpreterResult.Code.ERROR,
-                                   "Bad URL (it should be /index/type or /index/type/id)");
+          "Bad URL (it should be /index/type or /index/type/id)");
     }
 
-    final IndexResponse response = client
-      .prepareIndex(urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2])
-      .setSource(data)
-      .get();
+    final ActionResponse response = elsClient.index(
+        urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2], data);
 
     return new InterpreterResult(
-      InterpreterResult.Code.SUCCESS,
-      InterpreterResult.Type.TEXT,
-      response.getId());
+        InterpreterResult.Code.SUCCESS,
+        InterpreterResult.Type.TEXT,
+        response.getHit().getId());
   }
 
   /**
@@ -369,59 +403,39 @@ public class ElasticsearchInterpreter extends Interpreter {
    */
   private InterpreterResult processDelete(String[] urlItems) {
 
-    if (urlItems.length != 3
-        || StringUtils.isEmpty(urlItems[0])
-        || StringUtils.isEmpty(urlItems[1])
-        || StringUtils.isEmpty(urlItems[2])) {
+    final String[] indexTypeId = getIndexTypeId(urlItems);
+
+    if (indexTypeId == null) {
       return new InterpreterResult(InterpreterResult.Code.ERROR,
-                                   "Bad URL (it should be /index/type/id)");
+          "Bad URL (it should be /index/type/id)");
     }
 
-    final DeleteResponse response = client
-      .prepareDelete(urlItems[0], urlItems[1], urlItems[2])
-      .get();
+    final ActionResponse response =
+        elsClient.delete(indexTypeId[0], indexTypeId[1], indexTypeId[2]);
 
-    if (response.isFound()) {
+    if (response.isSucceeded()) {
       return new InterpreterResult(
-        InterpreterResult.Code.SUCCESS,
-        InterpreterResult.Type.TEXT,
-        response.getId());
+          InterpreterResult.Code.SUCCESS,
+          InterpreterResult.Type.TEXT,
+          response.getHit().getId());
     }
 
     return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
   }
 
-  private SearchResponse searchData(String[] urlItems, String query, int size) {
+  private ActionResponse searchData(String[] urlItems, String query, int size) {
 
-    final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
-      client, SearchAction.INSTANCE);
-    reqBuilder.setIndices();
+    String[] indices = null;
+    String[] types = null;
 
     if (urlItems.length >= 1) {
-      reqBuilder.setIndices(StringUtils.split(urlItems[0], ","));
+      indices = StringUtils.split(urlItems[0], ",");
     }
     if (urlItems.length > 1) {
-      reqBuilder.setTypes(StringUtils.split(urlItems[1], ","));
+      types = StringUtils.split(urlItems[1], ",");
     }
 
-    if (!StringUtils.isEmpty(query)) {
-      // The query can be either JSON-formatted, nor a Lucene query
-      // So, try to parse as a JSON => if there is an error, consider the query a Lucene one
-      try {
-        final Map source = gson.fromJson(query, Map.class);
-        reqBuilder.setExtraSource(source);
-      }
-      catch (JsonParseException e) {
-        // This is not a JSON (or maybe not well formatted...)
-        reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
-      }
-    }
-
-    reqBuilder.setSize(size);
-
-    final SearchResponse response = reqBuilder.get();
-
-    return response;
+    return elsClient.search(indices, types, query, size);
   }
 
   private InterpreterResult buildAggResponseMessage(Aggregations aggregations) {
@@ -442,8 +456,8 @@ public class ElasticsearchInterpreter extends Interpreter {
       final Set<String> headerKeys = new HashSet<>();
       final List<Map<String, Object>> buckets = new LinkedList<>();
       final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
-      
-      for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
+
+      for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
         try {
           final XContentBuilder builder = XContentFactory.jsonBuilder();
           bucket.toXContent(builder, null);
@@ -451,22 +465,22 @@ public class ElasticsearchInterpreter extends Interpreter {
           headerKeys.addAll(bucketMap.keySet());
           buckets.add(bucketMap);
         }
-        catch (IOException e) {
+        catch (final IOException e) {
           logger.error("Processing bucket: " + e.getMessage(), e);
         }
       }
-            
+
       final StringBuffer buffer = new StringBuffer();
       final String[] keys = headerKeys.toArray(new String[0]);
-      for (String key: keys) {
+      for (final String key: keys) {
         buffer.append("\t" + key);
       }
       buffer.deleteCharAt(0);
-      
-      for (Map<String, Object> bucket : buckets) {
+
+      for (final Map<String, Object> bucket : buckets) {
         buffer.append("\n");
-        
-        for (String key: keys) {
+
+        for (final String key: keys) {
           buffer.append(bucket.get(key)).append("\t");
         }
         buffer.deleteCharAt(buffer.length() - 1);
@@ -479,38 +493,64 @@ public class ElasticsearchInterpreter extends Interpreter {
     return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
   }
 
-  private String buildSearchHitsResponseMessage(SearchHit[] hits) {
+  private InterpreterResult buildAggResponseMessage(List<AggWrapper> aggregations) {
+
+    final InterpreterResult.Type resType = InterpreterResult.Type.TABLE;
+    String resMsg = "";
+
+    final Set<String> headerKeys = new HashSet<>();
+    final List<Map<String, Object>> buckets = new LinkedList<>();
+
+    for (final AggWrapper aggregation: aggregations) {
+      final Map<String, Object> bucketMap = JsonFlattener.flattenAsMap(aggregation.getResult());
+      headerKeys.addAll(bucketMap.keySet());
+      buckets.add(bucketMap);
+    }
+
+    final StringBuffer buffer = new StringBuffer();
+    final String[] keys = headerKeys.toArray(new String[0]);
+    for (final String key: keys) {
+      buffer.append("\t" + key);
+    }
+    buffer.deleteCharAt(0);
+
+    for (final Map<String, Object> bucket : buckets) {
+      buffer.append("\n");
+
+      for (final String key: keys) {
+        buffer.append(bucket.get(key)).append("\t");
+      }
+      buffer.deleteCharAt(buffer.length() - 1);
+    }
+
+    resMsg = buffer.toString();
+
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
+  }
+
+  private String buildSearchHitsResponseMessage(ActionResponse response) {
 
-    if (hits == null || hits.length == 0) {
+    if (response.getHits() == null || response.getHits().size() == 0) {
       return "";
     }
 
     //First : get all the keys in order to build an ordered list of the values for each hit
     //
-    final Map<String, Object> hitFields = new HashMap<>();
     final List<Map<String, Object>> flattenHits = new LinkedList<>();
     final Set<String> keys = new TreeSet<>();
-    for (SearchHit hit : hits) {
-      // Fields can be found either in _source, or in fields (it depends on the query)
-      //
-      String json = hit.getSourceAsString();
-      if (json == null) {
-        hitFields.clear();
-        for (SearchHitField hitField : hit.getFields().values()) {
-          hitFields.put(hitField.getName(), hitField.getValues());
-        }
-        json = gson.toJson(hitFields);
-      }
+    for (final HitWrapper hit : response.getHits()) {
+
+      final String json = hit.getSourceAsString();
 
       final Map<String, Object> flattenJsonMap = JsonFlattener.flattenAsMap(json);
       final Map<String, Object> flattenMap = new HashMap<>();
-      for (Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
+      for (final Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
         // Replace keys that match a format like that : [\"keyname\"][0]
         final String fieldName = iter.next();
         final Matcher fieldNameMatcher = FIELD_NAME_PATTERN.matcher(fieldName);
         if (fieldNameMatcher.matches()) {
           flattenMap.put(fieldNameMatcher.group(1) + fieldNameMatcher.group(2),
-            flattenJsonMap.get(fieldName));
+              flattenJsonMap.get(fieldName));
         }
         else {
           flattenMap.put(fieldName, flattenJsonMap.get(fieldName));
@@ -518,7 +558,7 @@ public class ElasticsearchInterpreter extends Interpreter {
       }
       flattenHits.add(flattenMap);
 
-      for (String key : flattenMap.keySet()) {
+      for (final String key : flattenMap.keySet()) {
         keys.add(key);
       }
     }
@@ -526,15 +566,15 @@ public class ElasticsearchInterpreter extends Interpreter {
     // Next : build the header of the table
     //
     final StringBuffer buffer = new StringBuffer();
-    for (String key : keys) {
+    for (final String key : keys) {
       buffer.append(key).append('\t');
     }
     buffer.replace(buffer.lastIndexOf("\t"), buffer.lastIndexOf("\t") + 1, "\n");
 
     // Finally : build the result by using the key set
     //
-    for (Map<String, Object> hit : flattenHits) {
-      for (String key : keys) {
+    for (final Map<String, Object> hit : flattenHits) {
+      for (final String key : keys) {
         final Object val = hit.get(key);
         if (val != null) {
           buffer.append(val);
@@ -547,17 +587,17 @@ public class ElasticsearchInterpreter extends Interpreter {
     return buffer.toString();
   }
 
-  private InterpreterResult buildResponseMessage(SearchResponse response) {
+  private InterpreterResult buildResponseMessage(ActionResponse response) {
 
-    final Aggregations aggregations = response.getAggregations();
+    final List<AggWrapper> aggregations = response.getAggregations();
 
-    if (aggregations != null && aggregations.asList().size() > 0) {
+    if (aggregations != null && aggregations.size() > 0) {
       return buildAggResponseMessage(aggregations);
     }
 
     return new InterpreterResult(
-      InterpreterResult.Code.SUCCESS,
-      InterpreterResult.Type.TABLE,
-      buildSearchHitsResponseMessage(response.getHits().getHits()));
+        InterpreterResult.Code.SUCCESS,
+        InterpreterResult.Type.TABLE,
+        buildSearchHitsResponseMessage(response));
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java
new file mode 100644
index 0000000..6846d0a
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+/**
+ * Runtime exception thrown when there is a problem during an action (search, get, ...).
+ */
+public class ActionException extends RuntimeException {
+
+  public ActionException(String message) {
+    super(message);
+  }
+
+  public ActionException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java
new file mode 100644
index 0000000..4141bce
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Contains the result of an action (hits, aggregations, ...).
+ */
+public class ActionResponse {
+
+  private boolean succeeded;
+  private long totalHits;
+  private final List<HitWrapper> hits = new LinkedList<>();
+  private final List<AggWrapper> aggregations = new LinkedList<>();
+
+
+  public ActionResponse succeeded(boolean succeeded) {
+    this.succeeded = succeeded;
+    return this;
+  }
+
+  public boolean isSucceeded() {
+    return succeeded;
+  }
+
+  public ActionResponse totalHits(long totalHits) {
+    this.totalHits = totalHits;
+    return this;
+  }
+
+  public long getTotalHits() {
+    return totalHits;
+  }
+
+  public List<HitWrapper> getHits() {
+    return hits;
+  }
+
+  public ActionResponse addHit(HitWrapper hit) {
+    this.hits.add(hit);
+    return this;
+  }
+
+  public List<AggWrapper> getAggregations() {
+    return aggregations;
+  }
+
+  public ActionResponse addAggregation(AggWrapper aggregation) {
+    this.aggregations.add(aggregation);
+    return this;
+  }
+
+  public ActionResponse hit(HitWrapper hit) {
+    this.addHit(hit);
+    return this;
+  }
+
+  public HitWrapper getHit() {
+    return this.hits.get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java
new file mode 100644
index 0000000..14446db
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+/**
+ * Contains the result of an aggregation.
+ */
+public class AggWrapper {
+
+  /** Type of an aggregation (to know if there are buckets or not) */
+  public enum AggregationType { SIMPLE, MULTI_BUCKETS };
+
+  private final AggregationType type;
+  private final String result;
+
+  public AggWrapper(AggregationType type, String result) {
+    this.type = type;
+    this.result = result;
+  }
+
+  public AggregationType getType() {
+    return type;
+  }
+
+  public String getResult() {
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java
new file mode 100644
index 0000000..3be4514
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Contains the data of a hit.
+ */
+public class HitWrapper {
+
+  private final JsonParser parser = new JsonParser();
+
+  private final String index;
+  private final String type;
+  private final String id;
+  private final String source;
+
+  public HitWrapper(String index, String type, String id, String source) {
+    this.index = index;
+    this.type = type;
+    this.id = id;
+    this.source = source;
+  }
+
+  public HitWrapper(String source) {
+    this(null, null, null, source);
+  }
+
+  public String getSourceAsString() {
+    return source;
+  }
+
+  public JsonObject getSourceAsJsonObject() {
+    final JsonElement element = parser.parse(source);
+    return element.getAsJsonObject();
+  }
+
+  public String getIndex() {
+    return index;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getId() {
+    return id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java
new file mode 100644
index 0000000..48e1980
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.client;
+
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+
+/**
+ * Interface that must be implemented by any kind of Elasticsearch client (transport, ...).
+ */
+public interface ElasticsearchClient {
+
+  ActionResponse get(String index, String type, String id);
+
+  ActionResponse index(String index, String type, String id, String data);
+
+  ActionResponse delete(String index, String type, String id);
+
+  ActionResponse search(String[] indices, String[] types, String query, int size);
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java
new file mode 100644
index 0000000..d691597
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.client;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
+import org.apache.zeppelin.elasticsearch.action.ActionException;
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper.AggregationType;
+import org.apache.zeppelin.elasticsearch.action.HitWrapper;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import com.google.common.base.Joiner;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParseException;
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.JsonNode;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import com.mashape.unirest.request.HttpRequest;
+import com.mashape.unirest.request.HttpRequestWithBody;
+
+/**
+ * Elasticsearch client using the HTTP API.
+ */
+public class HttpBasedClient implements ElasticsearchClient {
+
+  private static final String QUERY_STRING_TEMPLATE =
+      "{ \"query\": { \"query_string\": { \"query\": \"_Q_\", \"analyze_wildcard\": \"true\" } } }";
+
+  private final String host;
+  private final int port;
+  private final String username;
+  private final String password;
+
+  private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+  public HttpBasedClient(Properties props) {
+    this.host = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
+    this.port = Integer.parseInt(props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
+    this.username = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_USERNAME);
+    this.password = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_PASSWORD);
+  }
+
+  private boolean isSucceeded(HttpResponse response) {
+    return response.getStatus() >= 200 && response.getStatus() < 300;
+  }
+
+  private JSONObject getParentField(JSONObject parent, String[] fields) {
+    JSONObject obj = parent;
+    for (int i = 0; i < fields.length - 1; i++) {
+      obj = obj.getJSONObject(fields[i]);
+    }
+    return obj;
+  }
+
+  private JSONArray getFieldAsArray(JSONObject obj, String field) {
+    final String[] fields = field.split("/");
+    final JSONObject parent = getParentField(obj, fields);
+    return parent.getJSONArray(fields[fields.length - 1]);
+  }
+
+  private String getFieldAsString(HttpResponse<JsonNode> response, String field) {
+    return getFieldAsString(response.getBody(), field);
+  }
+
+  private String getFieldAsString(JsonNode json, String field) {
+    return json.getObject().get(field).toString();
+  }
+
+  private long getFieldAsLong(HttpResponse<JsonNode> response, String field) {
+    final String[] fields = field.split("/");
+    final JSONObject obj = getParentField(response.getBody().getObject(), fields);
+    return obj.getLong(fields[fields.length - 1]);
+  }
+
+  private String getUrl(String index, String type, String id, boolean useSearch) {
+    try {
+      final StringBuilder buffer = new StringBuilder();
+      buffer.append("http://").append(host).append(":").append(port).append("/");
+      if (StringUtils.isNotEmpty(index)) {
+        buffer.append(index);
+
+        if (StringUtils.isNotEmpty(type)) {
+          buffer.append("/").append(type);
+
+          if (StringUtils.isNotEmpty(id)) {
+            if (useSearch) {
+              final String encodedId = URLEncoder.encode(id, "UTF-8");
+              if (id.equals(encodedId)) {
+                // No difference, use directly the id
+                buffer.append("/").append(id);
+              }
+              else {
+                // There are differences: to avoid problems with some special characters
+                // such as / and # in id, use a "terms" query
+                buffer.append("/_search?source=")
+                  .append(URLEncoder
+                      .encode("{\"query\":{\"terms\":{\"_id\":[\"" + id + "\"]}}}", "UTF-8"));
+              }
+            }
+            else {
+              buffer.append("/").append(id);
+            }
+          }
+        }
+      }
+      return buffer.toString();
+    }
+    catch (final UnsupportedEncodingException e) {
+      throw new ActionException(e);
+    }
+  }
+
+  private String getUrl(String[] indices, String[] types) {
+    final String inds = indices == null ? null : Joiner.on(",").join(indices);
+    final String typs = types == null ? null : Joiner.on(",").join(types);
+    return getUrl(inds, typs, null, false);
+  }
+
+  @Override
+  public ActionResponse get(String index, String type, String id) {
+    ActionResponse response = null;
+    try {
+      final HttpRequest request = Unirest.get(getUrl(index, type, id, true));
+      if (StringUtils.isNotEmpty(username)) {
+        request.basicAuth(username, password);
+      }
+
+      final HttpResponse<String> result = request.asString();
+      final boolean isSucceeded = isSucceeded(result);
+
+      if (isSucceeded) {
+        final JsonNode body = new JsonNode(result.getBody());
+        if (body.getObject().has("_index")) {
+          response = new ActionResponse()
+              .succeeded(true)
+              .hit(new HitWrapper(
+                  getFieldAsString(body, "_index"),
+                  getFieldAsString(body, "_type"),
+                  getFieldAsString(body, "_id"),
+                  getFieldAsString(body, "_source")));
+        }
+        else {
+          final JSONArray hits = getFieldAsArray(body.getObject(), "hits/hits");
+          final JSONObject hit = (JSONObject) hits.iterator().next();
+          response = new ActionResponse()
+              .succeeded(true)
+              .hit(new HitWrapper(
+                  hit.getString("_index"),
+                  hit.getString("_type"),
+                  hit.getString("_id"),
+                  hit.opt("_source").toString()));
+        }
+      }
+      else {
+        if (result.getStatus() == 404) {
+          response = new ActionResponse()
+              .succeeded(false);
+        }
+        else {
+          throw new ActionException(result.getBody());
+        }
+      }
+    }
+    catch (final UnirestException e) {
+      throw new ActionException(e);
+    }
+    return response;
+  }
+
+  @Override
+  public ActionResponse delete(String index, String type, String id) {
+    ActionResponse response = null;
+    try {
+      final HttpRequest request = Unirest.delete(getUrl(index, type, id, true));
+      if (StringUtils.isNotEmpty(username)) {
+        request.basicAuth(username, password);
+      }
+
+      final HttpResponse<String> result = request.asString();
+      final boolean isSucceeded = isSucceeded(result);
+
+      if (isSucceeded) {
+        final JsonNode body = new JsonNode(result.getBody());
+        response = new ActionResponse()
+            .succeeded(true)
+            .hit(new HitWrapper(
+                getFieldAsString(body, "_index"),
+                getFieldAsString(body, "_type"),
+                getFieldAsString(body, "_id"),
+                null));
+      }
+      else {
+        throw new ActionException(result.getBody());
+      }
+    }
+    catch (final UnirestException e) {
+      throw new ActionException(e);
+    }
+    return response;
+  }
+
+  @Override
+  public ActionResponse index(String index, String type, String id, String data) {
+    ActionResponse response = null;
+    try {
+      HttpRequestWithBody request = null;
+      if (StringUtils.isEmpty(id)) {
+        request = Unirest.post(getUrl(index, type, id, false));
+      }
+      else {
+        request = Unirest.put(getUrl(index, type, id, false));
+      }
+      request
+          .header("Accept", "application/json")
+          .header("Content-Type", "application/json")
+          .body(data).getHttpRequest();
+      if (StringUtils.isNotEmpty(username)) {
+        request.basicAuth(username, password);
+      }
+
+      final HttpResponse<JsonNode> result = request.asJson();
+      final boolean isSucceeded = isSucceeded(result);
+
+      if (isSucceeded) {
+        response = new ActionResponse()
+            .succeeded(true)
+            .hit(new HitWrapper(
+                getFieldAsString(result, "_index"),
+                getFieldAsString(result, "_type"),
+                getFieldAsString(result, "_id"),
+                null));
+      }
+      else {
+        throw new ActionException(result.getBody().toString());
+      }
+    }
+    catch (final UnirestException e) {
+      throw new ActionException(e);
+    }
+    return response;
+  }
+
+  @Override
+  public ActionResponse search(String[] indices, String[] types, String query, int size) {
+    ActionResponse response = null;
+
+    if (!StringUtils.isEmpty(query)) {
+      // The query can be either JSON-formatted, nor a Lucene query
+      // So, try to parse as a JSON => if there is an error, consider the query a Lucene one
+      try {
+        gson.fromJson(query, Map.class);
+      }
+      catch (final JsonParseException e) {
+        // This is not a JSON (or maybe not well formatted...)
+        query = QUERY_STRING_TEMPLATE.replace("_Q_", query);
+      }
+    }
+
+    try {
+      final HttpRequestWithBody request = Unirest
+          .post(getUrl(indices, types) + "/_search?size=" + size)
+          .header("Content-Type", "application/json");
+
+      if (StringUtils.isNoneEmpty(query)) {
+        request.header("Accept", "application/json").body(query);
+      }
+
+      if (StringUtils.isNotEmpty(username)) {
+        request.basicAuth(username, password);
+      }
+
+      final HttpResponse<JsonNode> result = request.asJson();
+      final JSONObject body = result.getBody() != null ? result.getBody().getObject() : null;
+
+      if (isSucceeded(result)) {
+        final long total = getFieldAsLong(result, "hits/total");
+
+        response = new ActionResponse()
+            .succeeded(true)
+            .totalHits(total);
+
+        if (containsAggs(result)) {
+          JSONObject aggregationsMap = body.getJSONObject("aggregations");
+          if (aggregationsMap == null) {
+            aggregationsMap = body.getJSONObject("aggs");
+          }
+
+          for (final String key: aggregationsMap.keySet()) {
+            final JSONObject aggResult = aggregationsMap.getJSONObject(key);
+            if (aggResult.has("buckets")) {
+              // Multi-bucket aggregations
+              final Iterator<Object> buckets = aggResult.getJSONArray("buckets").iterator();
+              while (buckets.hasNext()) {
+                response.addAggregation(
+                    new AggWrapper(AggregationType.MULTI_BUCKETS, buckets.next().toString()));
+              }
+            }
+            else {
+              response.addAggregation(
+                  new AggWrapper(AggregationType.SIMPLE, aggregationsMap.toString()));
+            }
+            break; // Keep only one aggregation
+          }
+        }
+        else if (size > 0 && total > 0) {
+          final JSONArray hits = getFieldAsArray(body, "hits/hits");
+          final Iterator<Object> iter = hits.iterator();
+
+          while (iter.hasNext()) {
+            final JSONObject hit = (JSONObject) iter.next();
+            final Object data =
+                hit.opt("_source") != null ? hit.opt("_source") : hit.opt("fields");
+            response.addHit(new HitWrapper(
+                hit.getString("_index"),
+                hit.getString("_type"),
+                hit.getString("_id"),
+                data.toString()));
+          }
+        }
+      }
+      else {
+        throw new ActionException(body.get("error").toString());
+      }
+    }
+    catch (final UnirestException e) {
+      throw new ActionException(e);
+    }
+
+    return response;
+  }
+
+  private boolean containsAggs(HttpResponse<JsonNode> result) {
+    return result.getBody() != null &&
+        (result.getBody().getObject().has("aggregations") ||
+            result.getBody().getObject().has("aggs"));
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public String toString() {
+    return "HttpBasedClient [host=" + host + ", port=" + port + ", username=" + username + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java
new file mode 100644
index 0000000..1451019
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.client;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper;
+import org.apache.zeppelin.elasticsearch.action.HitWrapper;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHitField;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
+
+/**
+ * Elasticsearch client using the transport protocol.
+ */
+public class TransportBasedClient implements ElasticsearchClient {
+
+  private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+  private final Client client;
+
+  public TransportBasedClient(Properties props) throws UnknownHostException {
+    final String host =
+        props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
+    final int port = Integer.parseInt(
+        props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
+    final String clusterName =
+        props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME);
+
+    final Settings settings = Settings.settingsBuilder()
+        .put("cluster.name", clusterName)
+        .put(props)
+        .build();
+
+    client = TransportClient.builder().settings(settings).build()
+        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+  }
+
+  @Override
+  public ActionResponse get(String index, String type, String id) {
+    final GetResponse getResp = client
+        .prepareGet(index, type, id)
+        .get();
+
+    return new ActionResponse()
+        .succeeded(getResp.isExists())
+        .hit(new HitWrapper(
+            getResp.getIndex(),
+            getResp.getType(),
+            getResp.getId(),
+            getResp.getSourceAsString()));
+  }
+
+  @Override
+  public ActionResponse delete(String index, String type, String id) {
+    final DeleteResponse delResp = client
+        .prepareDelete(index, type, id)
+        .get();
+
+    return new ActionResponse()
+        .succeeded(delResp.isFound())
+        .hit(new HitWrapper(
+            delResp.getIndex(),
+            delResp.getType(),
+            delResp.getId(),
+            null));
+  }
+
+  @Override
+  public ActionResponse index(String index, String type, String id, String data) {
+    final IndexResponse idxResp = client
+        .prepareIndex(index, type, id)
+        .setSource(data)
+        .get();
+
+    return new ActionResponse()
+        .succeeded(idxResp.isCreated())
+        .hit(new HitWrapper(
+            idxResp.getIndex(),
+            idxResp.getType(),
+            idxResp.getId(),
+            null));
+  }
+
+  @Override
+  public ActionResponse search(String[] indices, String[] types, String query, int size) {
+    final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
+        client, SearchAction.INSTANCE);
+    reqBuilder.setIndices();
+
+    if (indices != null) {
+      reqBuilder.setIndices(indices);
+    }
+    if (types != null) {
+      reqBuilder.setTypes(types);
+    }
+
+    if (!StringUtils.isEmpty(query)) {
+      // The query can be either JSON-formatted, nor a Lucene query
+      // So, try to parse as a JSON => if there is an error, consider the query a Lucene one
+      try {
+        @SuppressWarnings("rawtypes")
+        final Map source = gson.fromJson(query, Map.class);
+        reqBuilder.setExtraSource(source);
+      }
+      catch (final JsonSyntaxException e) {
+        // This is not a JSON (or maybe not well formatted...)
+        reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
+      }
+    }
+
+    reqBuilder.setSize(size);
+
+    final SearchResponse searchResp = reqBuilder.get();
+
+    final ActionResponse actionResp = new ActionResponse()
+        .succeeded(true)
+        .totalHits(searchResp.getHits().getTotalHits());
+
+    if (searchResp.getAggregations() != null) {
+      setAggregations(searchResp.getAggregations(), actionResp);
+    }
+    else {
+      for (final SearchHit hit: searchResp.getHits()) {
+        // Fields can be found either in _source, or in fields (it depends on the query)
+        // => specific for elasticsearch's version < 5
+        //
+        String src = hit.getSourceAsString();
+        if (src == null) {
+          final Map<String, Object> hitFields = new HashMap<>();
+          for (final SearchHitField hitField : hit.getFields().values()) {
+            hitFields.put(hitField.getName(), hitField.getValues());
+          }
+          src = gson.toJson(hitFields);
+        }
+        actionResp.addHit(new HitWrapper(hit.getIndex(), hit.getType(), hit.getId(), src));
+      }
+    }
+
+    return actionResp;
+  }
+
+  private void setAggregations(Aggregations aggregations, ActionResponse actionResp) {
+    // Only the result of the first aggregation is returned
+    //
+    final Aggregation agg = aggregations.asList().get(0);
+
+    if (agg instanceof InternalMetricsAggregation) {
+      actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
+          XContentHelper.toString((InternalMetricsAggregation) agg).toString()));
+    }
+    else if (agg instanceof InternalSingleBucketAggregation) {
+      actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
+          XContentHelper.toString((InternalSingleBucketAggregation) agg).toString()));
+    }
+    else if (agg instanceof InternalMultiBucketAggregation) {
+      final Set<String> headerKeys = new HashSet<>();
+      final List<Map<String, Object>> buckets = new LinkedList<>();
+      final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
+
+      for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
+        try {
+          final XContentBuilder builder = XContentFactory.jsonBuilder();
+          bucket.toXContent(builder, null);
+          actionResp.addAggregation(
+              new AggWrapper(AggWrapper.AggregationType.MULTI_BUCKETS, builder.string()));
+        }
+        catch (final IOException e) {
+          // Ignored
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "TransportBasedClient []";
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/resources/interpreter-setting.json b/elasticsearch/src/main/resources/interpreter-setting.json
index 089c811..18200ae 100644
--- a/elasticsearch/src/main/resources/interpreter-setting.json
+++ b/elasticsearch/src/main/resources/interpreter-setting.json
@@ -16,6 +16,12 @@
         "defaultValue": "9300",
         "description": "The port for Elasticsearch"
       },
+      "elasticsearch.client.type": {
+        "envName": "ELASTICSEARCH_CLIENT_TYPE",
+        "propertyName": "elasticsearch.client.type",
+        "defaultValue": "transport",
+        "description": "The type of client for Elasticsearch (transport or http)"
+      },
       "elasticsearch.cluster.name": {
         "envName": "ELASTICSEARCH_CLUSTER_NAME",
         "propertyName": "elasticsearch.cluster.name",
@@ -27,6 +33,18 @@
         "propertyName": "elasticsearch.result.size",
         "defaultValue": "10",
         "description": "The size of the result set of a search query"
+      },
+      "elasticsearch.basicauth.username": {
+        "envName": "ELASTICSEARCH_BASIC_AUTH_USERNAME",
+        "propertyName": "elasticsearch.basicauth.username",
+        "defaultValue": "",
+        "description": "Username for a basic authentication"
+      },
+      "elasticsearch.basicauth.password": {
+        "envName": "ELASTICSEARCH_BASIC_AUTH_PASSWORD",
+        "propertyName": "elasticsearch.basicauth.password",
+        "defaultValue": "",
+        "description": "Password for a basic authentication"
       }
     },
     "editor": {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
index 8d3a14b..aece163 100644
--- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
+++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.elasticsearch;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,8 +28,11 @@ import java.util.Date;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.math.RandomUtils;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -40,13 +44,19 @@ import org.elasticsearch.node.NodeBuilder;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.experimental.theories.DataPoint;
+import org.junit.experimental.theories.Theories;
+import org.junit.experimental.theories.Theory;
+import org.junit.runner.RunWith;
 
+@RunWith(Theories.class)
 public class ElasticsearchInterpreterTest {
 
+  @DataPoint public static ElasticsearchInterpreter transportInterpreter;
+  @DataPoint public static ElasticsearchInterpreter httpInterpreter;
+
   private static Client elsClient;
   private static Node elsNode;
-  private static ElasticsearchInterpreter interpreter;
 
   private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" };
   private static final int[] STATUS = { 200, 404, 500, 403 };
@@ -57,6 +67,8 @@ public class ElasticsearchInterpreterTest {
   private static final String ELS_HTTP_PORT = "10200";
   private static final String ELS_PATH = "/tmp/els";
 
+  private static final AtomicInteger deleteId = new AtomicInteger(2);
+
 
   @BeforeClass
   public static void populate() throws IOException {
@@ -80,7 +92,7 @@ public class ElasticsearchInterpreterTest {
           .endObject()
         .endObject().endObject().endObject()).get();
 
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 48; i++) {
       elsClient.prepareIndex("logs", "http", "" + i)
         .setRefresh(true)
         .setSource(jsonBuilder()
@@ -97,22 +109,50 @@ public class ElasticsearchInterpreterTest {
         .get();
     }
 
+    for (int i = 1; i < 3; i++) {
+      elsClient.prepareIndex("logs", "http", "very/strange/id#" + i)
+        .setRefresh(true)
+        .setSource(jsonBuilder()
+            .startObject()
+              .field("date", new Date())
+              .startObject("request")
+                .field("method", METHODS[RandomUtils.nextInt(METHODS.length)])
+                .field("url", "/zeppelin/" + UUID.randomUUID().toString())
+                .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org"))
+              .endObject()
+              .field("status", STATUS[RandomUtils.nextInt(STATUS.length)])
+              .field("content_length", RandomUtils.nextInt(2000))
+            )
+        .get();
+    }
+
     final Properties props = new Properties();
     props.put(ElasticsearchInterpreter.ELASTICSEARCH_HOST, ELS_HOST);
-    props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
     props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME, ELS_CLUSTER_NAME);
-    interpreter = new ElasticsearchInterpreter(props);
-    interpreter.open();
+
+    props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
+    props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "transport");
+    transportInterpreter = new ElasticsearchInterpreter(props);
+    transportInterpreter.open();
+
+    props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_HTTP_PORT);
+    props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "http");
+    httpInterpreter = new ElasticsearchInterpreter(props);
+    httpInterpreter.open();
   }
 
   @AfterClass
   public static void clean() {
-    if (interpreter != null) {
-      interpreter.close();
+    if (transportInterpreter != null) {
+      transportInterpreter.close();
+    }
+
+    if (httpInterpreter != null) {
+      httpInterpreter.close();
     }
 
     if (elsClient != null) {
-      elsClient.admin().indices().delete(new DeleteIndexRequest("logs")).actionGet();
+      elsClient.admin().indices().delete(new DeleteIndexRequest("*")).actionGet();
       elsClient.close();
     }
 
@@ -121,97 +161,140 @@ public class ElasticsearchInterpreterTest {
     }
   }
 
-  @Test
-  public void testCount() {
+  private InterpreterContext buildContext(String noteAndParagraphId) {
+    final AngularObjectRegistry angularObjReg = new AngularObjectRegistry("elasticsearch", null);
+    return new InterpreterContext(noteAndParagraphId, noteAndParagraphId, null, null, null, null, null,
+        null, angularObjReg , null, null, null);
+  }
+
+  @Theory
+  public void testCount(ElasticsearchInterpreter interpreter) {
+
+    final InterpreterContext ctx = buildContext("testCount");
 
-    InterpreterResult res = interpreter.interpret("count /unknown", null);
+    InterpreterResult res = interpreter.interpret("count /unknown", ctx);
     assertEquals(Code.ERROR, res.code());
 
-    res = interpreter.interpret("count /logs", null);
+    res = interpreter.interpret("count /logs", ctx);
+    assertEquals(Code.SUCCESS, res.code());
     assertEquals("50", res.message().get(0).getData());
+    assertNotNull(ctx.getAngularObjectRegistry().get("count_testCount", null, null));
+    assertEquals(50l, ctx.getAngularObjectRegistry().get("count_testCount", null, null).get());
+
+    res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx);
+    assertEquals(Code.SUCCESS, res.code());
   }
 
-  @Test
-  public void testGet() {
+  @Theory
+  public void testGet(ElasticsearchInterpreter interpreter) {
+
+    final InterpreterContext ctx = buildContext("get");
 
-    InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null);
+    InterpreterResult res = interpreter.interpret("get /logs/http/unknown", ctx);
     assertEquals(Code.ERROR, res.code());
 
-    res = interpreter.interpret("get /logs/http/10", null);
+    res = interpreter.interpret("get /logs/http/unknown/unknown", ctx);
+    assertEquals(Code.ERROR, res.code());
+
+    res = interpreter.interpret("get /unknown/unknown/unknown", ctx);
+    assertEquals(Code.ERROR, res.code());
+
+    res = interpreter.interpret("get /logs/http/very/strange/id#1", ctx);
+    assertEquals(Code.SUCCESS, res.code());
+
+    res = interpreter.interpret("get /logs/http/4", ctx);
+    assertEquals(Code.SUCCESS, res.code());
+
+    res = interpreter.interpret("get /logs/_all/4", ctx);
     assertEquals(Code.SUCCESS, res.code());
   }
 
-  @Test
-  public void testSearch() {
+  @Theory
+  public void testSearch(ElasticsearchInterpreter interpreter) {
+
+    final InterpreterContext ctx = buildContext("search");
 
-    InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null);
+    InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", ctx);
     assertEquals(Code.SUCCESS, res.code());
 
-    res = interpreter.interpret("search /logs {{{hello}}}", null);
+    res = interpreter.interpret("search /logs {{{hello}}}", ctx);
     assertEquals(Code.ERROR, res.code());
 
-    res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", null);
+    res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx);
     assertEquals(Code.SUCCESS, res.code());
 
-    res = interpreter.interpret("search /logs status:404", null);
+    res = interpreter.interpret("search /logs status:404", ctx);
     assertEquals(Code.SUCCESS, res.code());
 
-    res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", null);
+    res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", ctx);
     assertEquals(Code.SUCCESS, res.code());
   }
 
-  @Test
-  public void testAgg() {
+  @Theory
+  public void testAgg(ElasticsearchInterpreter interpreter) {
+
+    final InterpreterContext ctx = buildContext("agg");
 
     // Single-value metric
     InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " +
-            " { \"cardinality\" : { \"field\" : \"status\" } } } }", null);
+            " { \"cardinality\" : { \"field\" : \"status\" } } } }", ctx);
     assertEquals(Code.SUCCESS, res.code());
 
     // Multi-value metric
     res = interpreter.interpret("search /logs { \"aggs\" : { \"content_length_stats\" : " +
-            " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", null);
+            " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", ctx);
     assertEquals(Code.SUCCESS, res.code());
 
     // Single bucket
     res = interpreter.interpret("search /logs { \"aggs\" : { " +
             " \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " +
-            "   \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", null);
+            "   \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", ctx);
     assertEquals(Code.SUCCESS, res.code());
 
     // Multi-buckets
     res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
-            " { \"terms\" : { \"field\" : \"status\" } } } }", null);
+            " { \"terms\" : { \"field\" : \"status\" } } } }", ctx);
     assertEquals(Code.SUCCESS, res.code());
-    
+
     res = interpreter.interpret("search /logs { \"aggs\" : { " +
             " \"length\" : { \"terms\": { \"field\": \"status\" }, " +
-            "   \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null);
+            "   \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", ctx);
     assertEquals(Code.SUCCESS, res.code());
   }
 
-  @Test
-  public void testIndex() {
+  @Theory
+  public void testIndex(ElasticsearchInterpreter interpreter) {
 
     InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null);
     assertEquals(Code.ERROR, res.code());
 
+    res = interpreter.interpret("index /logs/http { bad ", null);
+    assertEquals(Code.ERROR, res.code());
+
     res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
     assertEquals(Code.SUCCESS, res.code());
+
+    res = interpreter.interpret("index /logs/http/1000 { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
+    assertEquals(Code.SUCCESS, res.code());
   }
 
-  @Test
-  public void testDelete() {
+  @Theory
+  public void testDelete(ElasticsearchInterpreter interpreter) {
 
     InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null);
     assertEquals(Code.ERROR, res.code());
 
-    res = interpreter.interpret("delete /logs/http/11", null);
-    assertEquals("11", res.message().get(0).getData());
+    res = interpreter.interpret("delete /unknown/unknown/unknown", null);
+    assertEquals(Code.ERROR, res.code());
+
+    final int testDeleteId = deleteId.decrementAndGet();
+    res = interpreter.interpret("delete /logs/http/" + testDeleteId, null);
+    assertEquals(Code.SUCCESS, res.code());
+    assertEquals("" + testDeleteId, res.message().get(0).getData());
   }
 
-  @Test
-  public void testMisc() {
+  @Theory
+  public void testMisc(ElasticsearchInterpreter interpreter) {
 
     InterpreterResult res = interpreter.interpret(null, null);
     assertEquals(Code.SUCCESS, res.code());
@@ -220,23 +303,23 @@ public class ElasticsearchInterpreterTest {
     assertEquals(Code.SUCCESS, res.code());
   }
 
-  @Test
-  public void testCompletion() {
-    List expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
-    List expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
+  @Theory
+  public void testCompletion(ElasticsearchInterpreter interpreter) {
+    final List<InterpreterCompletion> expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
+    final List<InterpreterCompletion> expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
 
-    List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
-    List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
-    List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
+    final List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
+    final List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
+    final List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
 
     Assert.assertEquals(expectedResultOne, resultOne);
     Assert.assertEquals(expectedResultTwo, resultTwo);
 
-    List allCompletionList = new ArrayList<>();
-    for (InterpreterCompletion ic : resultAll) {
+    final List<String> allCompletionList = new ArrayList<>();
+    for (final InterpreterCompletion ic : resultAll) {
       allCompletionList.add(ic.getName());
     }
-    Assert.assertEquals(interpreter.COMMANDS, allCompletionList);
+    Assert.assertEquals(ElasticsearchInterpreter.COMMANDS, allCompletionList);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/zeppelin-distribution/src/bin_license/LICENSE
----------------------------------------------------------------------
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index 1acdf58..d27f27d 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -13,6 +13,7 @@ The following components are provided under Apache License.
     (Apache 2.0) Apache Commons Exec (commons-exec:commons-exec:1.3 - http://commons.apache.org/exec/)
     (Apache 2.0) Http Components (org.apache.httpcomponents:httpcore:4.3.3 - https://github.com/apache/httpclient)
     (Apache 2.0) Http Components (org.apache.httpcomponents:httpclient:4.3.6 - https://github.com/apache/httpclient)
+    (Apache 2.0) Http Components (org.apache.httpcomponents:httpasyncclient:4.0.2 - https://github.com/apache/httpclient)
     (Apache 2.0) Apache Commons Lang (org.apache.commons:commons-lang:2.5 - http://commons.apache.org/proper/commons-lang/)
     (Apache 2.0) Apache Commons Lang 3 (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/)
     (Apache 2.0) Apache Commons Math 3 (org.apache.commons:commons-math3:3.6.1 - http://commons.apache.org/proper/commons-math/)
@@ -266,6 +267,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
     (The MIT License) Java String Similarity 0.12 (info.debatty:java-string-similarity:0.12 - https://github.com/tdebatty/java-string-similarity)
     (The MIT License) Java LSH 0.10 (info.debatty:java-lsh:0.10 - https://github.com/tdebatty/java-LSH)
     (The MIT License) JSoup 1.6.1 (org.jsoup:jsoup:1.6.1 - https://github.com/jhy/jsoup/)
+    (The MIT License) Unirest 1.4.9 (com.mashape.unirest:unirest-java:1.4.9 - https://github.com/Mashape/unirest-java)
     (The MIT License) ngclipboard v1.1.1 (https://github.com/sachinchoolur/ngclipboard) - https://github.com/sachinchoolur/ngclipboard/blob/1.1.1/LICENSE)
 
 ========================================================================