You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/29 23:04:51 UTC
zeppelin git commit: [ZEPPELIN-1821] Add HTTP client to elasticsearch
interpreter
Repository: zeppelin
Updated Branches:
refs/heads/master 940a8b7d3 -> e763b3bf3
[ZEPPELIN-1821] Add HTTP client to elasticsearch interpreter
### What is this PR for?
Add HTTP client to elasticsearch interpreter.
### What type of PR is it?
Feature
### Todos
* [X] - Source code
* [X] - Tests
* [X] - License
* [X] - Docs
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1821
### How should this be tested?
* Start an Elasticsearch node
* Configure the elasticsearch interpreter to use http
* Create queries in a note using elasticsearch
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? Yes
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
Author: Bruno Bonnin <bb...@gmail.com>
Author: Bruno Bonnin <br...@myscript.com>
Closes #1902 from bbonnin/master and squashes the following commits:
f5a539e [Bruno Bonnin] Remove commented code lines
86153a8 [Bruno Bonnin] Merge remote-tracking branch 'upstream/master'
2e1bbbd [Bruno Bonnin] Merge remote-tracking branch 'upstream/master'
19e888e [Bruno Bonnin] Remove bad code in test
523d155 [Bruno Bonnin] Replace Java 8 methods
6bcf369 [Bruno Bonnin] Fix issue with id containing special chars (/, #)
4e9812e [Bruno Bonnin] Merge elasticsearch/pom.xml
5a96ae0 [Bruno Bonnin] Merge branch 'master' into master
e2365fb [Bruno Bonnin] Update elasticsearch/pom.xml
28b9805 [Bruno Bonnin] Update img
549db39 [Bruno Bonnin] Add HTTP client to elasticsearch interpreter
f4c5ac3 [Bruno Bonnin] HTTP-based Elasticsearch client
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e763b3bf
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e763b3bf
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e763b3bf
Branch: refs/heads/master
Commit: e763b3bf3e8a26a2e2134bc615aac1bff59cd82d
Parents: 940a8b7
Author: Bruno Bonnin <bb...@gmail.com>
Authored: Sat Jan 28 10:16:43 2017 +0100
Committer: Lee moon soo <mo...@apache.org>
Committed: Mon Jan 30 08:04:49 2017 +0900
----------------------------------------------------------------------
.../img/docs-img/elasticsearch-config.png | Bin 150536 -> 55656 bytes
docs/interpreter/elasticsearch.md | 17 +-
elasticsearch/pom.xml | 15 +-
.../elasticsearch/ElasticsearchInterpreter.java | 370 ++++++++++--------
.../elasticsearch/action/ActionException.java | 32 ++
.../elasticsearch/action/ActionResponse.java | 78 ++++
.../elasticsearch/action/AggWrapper.java | 43 +++
.../elasticsearch/action/HitWrapper.java | 67 ++++
.../client/ElasticsearchClient.java | 36 ++
.../elasticsearch/client/HttpBasedClient.java | 372 +++++++++++++++++++
.../client/TransportBasedClient.java | 235 ++++++++++++
.../src/main/resources/interpreter-setting.json | 18 +
.../ElasticsearchInterpreterTest.java | 183 ++++++---
zeppelin-distribution/src/bin_license/LICENSE | 2 +
14 files changed, 1251 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png
index b5f7dda..54a634a 100644
Binary files a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png and b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png differ
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/docs/interpreter/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/elasticsearch.md b/docs/interpreter/elasticsearch.md
index 7f3fb27..165116b 100644
--- a/docs/interpreter/elasticsearch.md
+++ b/docs/interpreter/elasticsearch.md
@@ -46,7 +46,22 @@ limitations under the License.
<tr>
<td>elasticsearch.port</td>
<td>9300</td>
- <td>Connection port <b>( Important: this is not the HTTP port, but the transport port )</b></td>
+ <td>Connection port <b>( Important: it depends on the client type, transport or http)</b></td>
+ </tr>
+ <tr>
+ <td>elasticsearch.client.type</td>
+ <td>transport</td>
+ <td>The type of client for Elasticsearch (transport or http)<b>( Important: the port depends on this value)</b></td>
+ </tr>
+ <tr>
+ <td>elasticsearch.basicauth.username</td>
+ <td></td>
+ <td>Username for a basic authentication (http)</b></td>
+ </tr>
+ <tr>
+ <td>elasticsearch.basicauth.password</td>
+ <td></td>
+ <td>Password for a basic authentication (http)</b></td>
</tr>
<tr>
<td>elasticsearch.result.size</td>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 6f09c7d..6042a14 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -26,7 +26,6 @@
<relativePath>..</relativePath>
</parent>
- <groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-elasticsearch</artifactId>
<packaging>jar</packaging>
<version>0.8.0-SNAPSHOT</version>
@@ -34,8 +33,10 @@
<properties>
<elasticsearch.version>2.4.3</elasticsearch.version>
+ <httpasyncclient.version>4.0.2</httpasyncclient.version>
<guava.version>18.0</guava.version>
<json-flattener.version>0.1.6</json-flattener.version>
+ <unirest.version>1.4.9</unirest.version>
</properties>
<dependencies>
@@ -51,6 +52,12 @@
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>${httpasyncclient.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
@@ -65,6 +72,12 @@
</dependency>
<dependency>
+ <groupId>com.mashape.unirest</groupId>
+ <artifactId>unirest-java</artifactId>
+ <version>${unirest.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
index 549b5f2..e3918e4 100644
--- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
@@ -18,7 +18,6 @@
package org.apache.zeppelin.elasticsearch;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -33,27 +32,20 @@ import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper;
+import org.apache.zeppelin.elasticsearch.action.HitWrapper;
+import org.apache.zeppelin.elasticsearch.client.ElasticsearchClient;
+import org.apache.zeppelin.elasticsearch.client.HttpBasedClient;
+import org.apache.zeppelin.elasticsearch.client.TransportBasedClient;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@@ -66,7 +58,7 @@ import org.slf4j.LoggerFactory;
import com.github.wnameless.json.flattener.JsonFlattener;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParseException;
+import com.google.gson.JsonObject;
/**
@@ -77,75 +69,82 @@ public class ElasticsearchInterpreter extends Interpreter {
private static Logger logger = LoggerFactory.getLogger(ElasticsearchInterpreter.class);
private static final String HELP = "Elasticsearch interpreter:\n"
- + "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
- + " - indices: list of indices separated by commas (depends on the command)\n"
- + " - types: list of document types separated by commas (depends on the command)\n"
- + "Commands:\n"
- + " - search /indices/types <query>\n"
- + " . indices and types can be omitted (at least, you have to provide '/')\n"
- + " . a query is either a JSON-formatted query, nor a lucene query\n"
- + " - size <value>\n"
- + " . defines the size of the result set (default value is in the config)\n"
- + " . if used, this command must be declared before a search command\n"
- + " - count /indices/types <query>\n"
- + " . same comments as for the search\n"
- + " - get /index/type/id\n"
- + " - delete /index/type/id\n"
- + " - index /ndex/type/id <json-formatted document>\n"
- + " . the id can be omitted, elasticsearch will generate one";
+ + "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
+ + " - indices: list of indices separated by commas (depends on the command)\n"
+ + " - types: list of document types separated by commas (depends on the command)\n"
+ + "Commands:\n"
+ + " - search /indices/types <query>\n"
+ + " . indices and types can be omitted (at least, you have to provide '/')\n"
+ + " . a query is either a JSON-formatted query, nor a lucene query\n"
+ + " - size <value>\n"
+ + " . defines the size of the result set (default value is in the config)\n"
+ + " . if used, this command must be declared before a search command\n"
+ + " - count /indices/types <query>\n"
+ + " . same comments as for the search\n"
+ + " - get /index/type/id\n"
+ + " - delete /index/type/id\n"
+ + " - index /ndex/type/id <json-formatted document>\n"
+ + " . the id can be omitted, elasticsearch will generate one";
protected static final List<String> COMMANDS = Arrays.asList(
- "count", "delete", "get", "help", "index", "search");
+ "count", "delete", "get", "help", "index", "search");
private static final Pattern FIELD_NAME_PATTERN = Pattern.compile("\\[\\\\\"(.+)\\\\\"\\](.*)");
public static final String ELASTICSEARCH_HOST = "elasticsearch.host";
public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
+ public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name";
public static final String ELASTICSEARCH_RESULT_SIZE = "elasticsearch.result.size";
+ public static final String ELASTICSEARCH_BASIC_AUTH_USERNAME = "elasticsearch.basicauth.username";
+ public static final String ELASTICSEARCH_BASIC_AUTH_PASSWORD = "elasticsearch.basicauth.password";
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
- private Client client;
- private String host = "localhost";
- private int port = 9300;
- private String clusterName = "elasticsearch";
+ private ElasticsearchClient elsClient;
private int resultSize = 10;
public ElasticsearchInterpreter(Properties property) {
super(property);
- this.host = getProperty(ELASTICSEARCH_HOST);
- this.port = Integer.parseInt(getProperty(ELASTICSEARCH_PORT));
- this.clusterName = getProperty(ELASTICSEARCH_CLUSTER_NAME);
+
+ }
+
+ @Override
+ public void open() {
+ logger.info("Properties: {}", getProperty());
+
+ String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE);
+ clientType = clientType == null ? null : clientType.toLowerCase();
+
try {
this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE));
- } catch (NumberFormatException e) {
+ }
+ catch (final NumberFormatException e) {
this.resultSize = 10;
logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
- property.get(ELASTICSEARCH_RESULT_SIZE), e);
+ property.get(ELASTICSEARCH_RESULT_SIZE), e);
}
- }
- @Override
- public void open() {
try {
- logger.info("prop={}", getProperty());
- final Settings settings = Settings.settingsBuilder()
- .put("cluster.name", clusterName)
- .put(getProperty())
- .build();
- client = TransportClient.builder().settings(settings).build()
- .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
- }
- catch (IOException e) {
+ if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) {
+ elsClient = new TransportBasedClient(getProperty());
+ }
+ else if ("http".equals(clientType)) {
+ elsClient = new HttpBasedClient(getProperty());
+ }
+ else {
+ logger.error("Unknown type of Elasticsearch client: " + clientType);
+ }
+ }
+ catch (final IOException e) {
logger.error("Open connection with Elasticsearch", e);
}
}
@Override
public void close() {
- if (client != null) {
- client.close();
+ if (elsClient != null) {
+ elsClient.close();
}
}
@@ -159,7 +158,7 @@ public class ElasticsearchInterpreter extends Interpreter {
int currentResultSize = resultSize;
- if (client == null) {
+ if (elsClient == null) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Problem with the Elasticsearch client, please check your configuration (host, port,...)");
}
@@ -178,7 +177,7 @@ public class ElasticsearchInterpreter extends Interpreter {
if (lines.length < 2) {
return processHelp(InterpreterResult.Code.ERROR,
- "Size cmd must be followed by a search");
+ "Size cmd must be followed by a search");
}
final String[] sizeLine = StringUtils.split(lines[0], " ", 2);
@@ -202,13 +201,13 @@ public class ElasticsearchInterpreter extends Interpreter {
try {
if ("get".equalsIgnoreCase(method)) {
- return processGet(urlItems);
+ return processGet(urlItems, interpreterContext);
}
else if ("count".equalsIgnoreCase(method)) {
- return processCount(urlItems, data);
+ return processCount(urlItems, data, interpreterContext);
}
else if ("search".equalsIgnoreCase(method)) {
- return processSearch(urlItems, data, currentResultSize);
+ return processSearch(urlItems, data, currentResultSize, interpreterContext);
}
else if ("index".equalsIgnoreCase(method)) {
return processIndex(urlItems, data);
@@ -219,7 +218,7 @@ public class ElasticsearchInterpreter extends Interpreter {
return processHelp(InterpreterResult.Code.ERROR, "Unknown command");
}
- catch (Exception e) {
+ catch (final Exception e) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Error : " + e.getMessage());
}
}
@@ -243,7 +242,7 @@ public class ElasticsearchInterpreter extends Interpreter {
public List<InterpreterCompletion> completion(String s, int i) {
final List suggestions = new ArrayList<>();
- for (String cmd : COMMANDS) {
+ for (final String cmd : COMMANDS) {
if (cmd.toLowerCase().contains(s)) {
suggestions.add(new InterpreterCompletion(cmd, cmd));
}
@@ -251,6 +250,31 @@ public class ElasticsearchInterpreter extends Interpreter {
return suggestions;
}
+ private void addAngularObject(InterpreterContext interpreterContext, String prefix, Object obj) {
+ interpreterContext.getAngularObjectRegistry().add(
+ prefix + "_" + interpreterContext.getParagraphId().replace("-", "_"),
+ obj, null, null);
+ }
+
+ private String[] getIndexTypeId(String[] urlItems) {
+
+ if (urlItems.length < 3) {
+ return null;
+ }
+
+ final String index = urlItems[0];
+ final String type = urlItems[1];
+ final String id = StringUtils.join(Arrays.copyOfRange(urlItems, 2, urlItems.length), '/');
+
+ if (StringUtils.isEmpty(index)
+ || StringUtils.isEmpty(type)
+ || StringUtils.isEmpty(id)) {
+ return null;
+ }
+
+ return new String[] { index, type, id };
+ }
+
private InterpreterResult processHelp(InterpreterResult.Code code, String additionalMessage) {
final StringBuffer buffer = new StringBuffer();
@@ -267,28 +291,30 @@ public class ElasticsearchInterpreter extends Interpreter {
* Processes a "get" request.
*
* @param urlItems Items of the URL
+ * @param interpreterContext Instance of the context
* @return Result of the get request, it contains a JSON-formatted string
*/
- private InterpreterResult processGet(String[] urlItems) {
+ private InterpreterResult processGet(String[] urlItems, InterpreterContext interpreterContext) {
+
+ final String[] indexTypeId = getIndexTypeId(urlItems);
- if (urlItems.length != 3
- || StringUtils.isEmpty(urlItems[0])
- || StringUtils.isEmpty(urlItems[1])
- || StringUtils.isEmpty(urlItems[2])) {
+ if (indexTypeId == null) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
- "Bad URL (it should be /index/type/id)");
+ "Bad URL (it should be /index/type/id)");
}
- final GetResponse response = client
- .prepareGet(urlItems[0], urlItems[1], urlItems[2])
- .get();
- if (response.isExists()) {
- final String json = gson.toJson(response.getSource());
+ final ActionResponse response = elsClient.get(indexTypeId[0], indexTypeId[1], indexTypeId[2]);
+
+ if (response.isSucceeded()) {
+ final JsonObject json = response.getHit().getSourceAsJsonObject();
+ final String jsonStr = gson.toJson(json);
+
+ addAngularObject(interpreterContext, "get", json);
return new InterpreterResult(
- InterpreterResult.Code.SUCCESS,
- InterpreterResult.Type.TEXT,
- json);
+ InterpreterResult.Code.SUCCESS,
+ InterpreterResult.Type.TEXT,
+ jsonStr);
}
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
@@ -299,21 +325,25 @@ public class ElasticsearchInterpreter extends Interpreter {
*
* @param urlItems Items of the URL
* @param data May contains the JSON of the request
+ * @param interpreterContext Instance of the context
* @return Result of the count request, it contains the total hits
*/
- private InterpreterResult processCount(String[] urlItems, String data) {
+ private InterpreterResult processCount(String[] urlItems, String data,
+ InterpreterContext interpreterContext) {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
- "Bad URL (it should be /index1,index2,.../type1,type2,...)");
+ "Bad URL (it should be /index1,index2,.../type1,type2,...)");
}
- final SearchResponse response = searchData(urlItems, data, 0);
+ final ActionResponse response = searchData(urlItems, data, 0);
+
+ addAngularObject(interpreterContext, "count", response.getTotalHits());
return new InterpreterResult(
- InterpreterResult.Code.SUCCESS,
- InterpreterResult.Type.TEXT,
- "" + response.getHits().getTotalHits());
+ InterpreterResult.Code.SUCCESS,
+ InterpreterResult.Type.TEXT,
+ "" + response.getTotalHits());
}
/**
@@ -322,16 +352,22 @@ public class ElasticsearchInterpreter extends Interpreter {
* @param urlItems Items of the URL
* @param data May contains the JSON of the request
* @param size Limit of result set
+ * @param interpreterContext Instance of the context
* @return Result of the search request, it contains a tab-formatted string of the matching hits
*/
- private InterpreterResult processSearch(String[] urlItems, String data, int size) {
+ private InterpreterResult processSearch(String[] urlItems, String data, int size,
+ InterpreterContext interpreterContext) {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
- "Bad URL (it should be /index1,index2,.../type1,type2,...)");
+ "Bad URL (it should be /index1,index2,.../type1,type2,...)");
}
- final SearchResponse response = searchData(urlItems, data, size);
+ final ActionResponse response = searchData(urlItems, data, size);
+
+ addAngularObject(interpreterContext, "search",
+ (response.getAggregations() != null && response.getAggregations().size() > 0) ?
+ response.getAggregations() : response.getHits());
return buildResponseMessage(response);
}
@@ -347,18 +383,16 @@ public class ElasticsearchInterpreter extends Interpreter {
if (urlItems.length < 2 || urlItems.length > 3) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
- "Bad URL (it should be /index/type or /index/type/id)");
+ "Bad URL (it should be /index/type or /index/type/id)");
}
- final IndexResponse response = client
- .prepareIndex(urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2])
- .setSource(data)
- .get();
+ final ActionResponse response = elsClient.index(
+ urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2], data);
return new InterpreterResult(
- InterpreterResult.Code.SUCCESS,
- InterpreterResult.Type.TEXT,
- response.getId());
+ InterpreterResult.Code.SUCCESS,
+ InterpreterResult.Type.TEXT,
+ response.getHit().getId());
}
/**
@@ -369,59 +403,39 @@ public class ElasticsearchInterpreter extends Interpreter {
*/
private InterpreterResult processDelete(String[] urlItems) {
- if (urlItems.length != 3
- || StringUtils.isEmpty(urlItems[0])
- || StringUtils.isEmpty(urlItems[1])
- || StringUtils.isEmpty(urlItems[2])) {
+ final String[] indexTypeId = getIndexTypeId(urlItems);
+
+ if (indexTypeId == null) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
- "Bad URL (it should be /index/type/id)");
+ "Bad URL (it should be /index/type/id)");
}
- final DeleteResponse response = client
- .prepareDelete(urlItems[0], urlItems[1], urlItems[2])
- .get();
+ final ActionResponse response =
+ elsClient.delete(indexTypeId[0], indexTypeId[1], indexTypeId[2]);
- if (response.isFound()) {
+ if (response.isSucceeded()) {
return new InterpreterResult(
- InterpreterResult.Code.SUCCESS,
- InterpreterResult.Type.TEXT,
- response.getId());
+ InterpreterResult.Code.SUCCESS,
+ InterpreterResult.Type.TEXT,
+ response.getHit().getId());
}
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
}
- private SearchResponse searchData(String[] urlItems, String query, int size) {
+ private ActionResponse searchData(String[] urlItems, String query, int size) {
- final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
- client, SearchAction.INSTANCE);
- reqBuilder.setIndices();
+ String[] indices = null;
+ String[] types = null;
if (urlItems.length >= 1) {
- reqBuilder.setIndices(StringUtils.split(urlItems[0], ","));
+ indices = StringUtils.split(urlItems[0], ",");
}
if (urlItems.length > 1) {
- reqBuilder.setTypes(StringUtils.split(urlItems[1], ","));
+ types = StringUtils.split(urlItems[1], ",");
}
- if (!StringUtils.isEmpty(query)) {
- // The query can be either JSON-formatted, nor a Lucene query
- // So, try to parse as a JSON => if there is an error, consider the query a Lucene one
- try {
- final Map source = gson.fromJson(query, Map.class);
- reqBuilder.setExtraSource(source);
- }
- catch (JsonParseException e) {
- // This is not a JSON (or maybe not well formatted...)
- reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
- }
- }
-
- reqBuilder.setSize(size);
-
- final SearchResponse response = reqBuilder.get();
-
- return response;
+ return elsClient.search(indices, types, query, size);
}
private InterpreterResult buildAggResponseMessage(Aggregations aggregations) {
@@ -442,8 +456,8 @@ public class ElasticsearchInterpreter extends Interpreter {
final Set<String> headerKeys = new HashSet<>();
final List<Map<String, Object>> buckets = new LinkedList<>();
final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
-
- for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
+
+ for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
try {
final XContentBuilder builder = XContentFactory.jsonBuilder();
bucket.toXContent(builder, null);
@@ -451,22 +465,22 @@ public class ElasticsearchInterpreter extends Interpreter {
headerKeys.addAll(bucketMap.keySet());
buckets.add(bucketMap);
}
- catch (IOException e) {
+ catch (final IOException e) {
logger.error("Processing bucket: " + e.getMessage(), e);
}
}
-
+
final StringBuffer buffer = new StringBuffer();
final String[] keys = headerKeys.toArray(new String[0]);
- for (String key: keys) {
+ for (final String key: keys) {
buffer.append("\t" + key);
}
buffer.deleteCharAt(0);
-
- for (Map<String, Object> bucket : buckets) {
+
+ for (final Map<String, Object> bucket : buckets) {
buffer.append("\n");
-
- for (String key: keys) {
+
+ for (final String key: keys) {
buffer.append(bucket.get(key)).append("\t");
}
buffer.deleteCharAt(buffer.length() - 1);
@@ -479,38 +493,64 @@ public class ElasticsearchInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
}
- private String buildSearchHitsResponseMessage(SearchHit[] hits) {
+ private InterpreterResult buildAggResponseMessage(List<AggWrapper> aggregations) {
+
+ final InterpreterResult.Type resType = InterpreterResult.Type.TABLE;
+ String resMsg = "";
+
+ final Set<String> headerKeys = new HashSet<>();
+ final List<Map<String, Object>> buckets = new LinkedList<>();
+
+ for (final AggWrapper aggregation: aggregations) {
+ final Map<String, Object> bucketMap = JsonFlattener.flattenAsMap(aggregation.getResult());
+ headerKeys.addAll(bucketMap.keySet());
+ buckets.add(bucketMap);
+ }
+
+ final StringBuffer buffer = new StringBuffer();
+ final String[] keys = headerKeys.toArray(new String[0]);
+ for (final String key: keys) {
+ buffer.append("\t" + key);
+ }
+ buffer.deleteCharAt(0);
+
+ for (final Map<String, Object> bucket : buckets) {
+ buffer.append("\n");
+
+ for (final String key: keys) {
+ buffer.append(bucket.get(key)).append("\t");
+ }
+ buffer.deleteCharAt(buffer.length() - 1);
+ }
+
+ resMsg = buffer.toString();
+
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
+ }
+
+ private String buildSearchHitsResponseMessage(ActionResponse response) {
- if (hits == null || hits.length == 0) {
+ if (response.getHits() == null || response.getHits().size() == 0) {
return "";
}
//First : get all the keys in order to build an ordered list of the values for each hit
//
- final Map<String, Object> hitFields = new HashMap<>();
final List<Map<String, Object>> flattenHits = new LinkedList<>();
final Set<String> keys = new TreeSet<>();
- for (SearchHit hit : hits) {
- // Fields can be found either in _source, or in fields (it depends on the query)
- //
- String json = hit.getSourceAsString();
- if (json == null) {
- hitFields.clear();
- for (SearchHitField hitField : hit.getFields().values()) {
- hitFields.put(hitField.getName(), hitField.getValues());
- }
- json = gson.toJson(hitFields);
- }
+ for (final HitWrapper hit : response.getHits()) {
+
+ final String json = hit.getSourceAsString();
final Map<String, Object> flattenJsonMap = JsonFlattener.flattenAsMap(json);
final Map<String, Object> flattenMap = new HashMap<>();
- for (Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
+ for (final Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
// Replace keys that match a format like that : [\"keyname\"][0]
final String fieldName = iter.next();
final Matcher fieldNameMatcher = FIELD_NAME_PATTERN.matcher(fieldName);
if (fieldNameMatcher.matches()) {
flattenMap.put(fieldNameMatcher.group(1) + fieldNameMatcher.group(2),
- flattenJsonMap.get(fieldName));
+ flattenJsonMap.get(fieldName));
}
else {
flattenMap.put(fieldName, flattenJsonMap.get(fieldName));
@@ -518,7 +558,7 @@ public class ElasticsearchInterpreter extends Interpreter {
}
flattenHits.add(flattenMap);
- for (String key : flattenMap.keySet()) {
+ for (final String key : flattenMap.keySet()) {
keys.add(key);
}
}
@@ -526,15 +566,15 @@ public class ElasticsearchInterpreter extends Interpreter {
// Next : build the header of the table
//
final StringBuffer buffer = new StringBuffer();
- for (String key : keys) {
+ for (final String key : keys) {
buffer.append(key).append('\t');
}
buffer.replace(buffer.lastIndexOf("\t"), buffer.lastIndexOf("\t") + 1, "\n");
// Finally : build the result by using the key set
//
- for (Map<String, Object> hit : flattenHits) {
- for (String key : keys) {
+ for (final Map<String, Object> hit : flattenHits) {
+ for (final String key : keys) {
final Object val = hit.get(key);
if (val != null) {
buffer.append(val);
@@ -547,17 +587,17 @@ public class ElasticsearchInterpreter extends Interpreter {
return buffer.toString();
}
- private InterpreterResult buildResponseMessage(SearchResponse response) {
+ private InterpreterResult buildResponseMessage(ActionResponse response) {
- final Aggregations aggregations = response.getAggregations();
+ final List<AggWrapper> aggregations = response.getAggregations();
- if (aggregations != null && aggregations.asList().size() > 0) {
+ if (aggregations != null && aggregations.size() > 0) {
return buildAggResponseMessage(aggregations);
}
return new InterpreterResult(
- InterpreterResult.Code.SUCCESS,
- InterpreterResult.Type.TABLE,
- buildSearchHitsResponseMessage(response.getHits().getHits()));
+ InterpreterResult.Code.SUCCESS,
+ InterpreterResult.Type.TABLE,
+ buildSearchHitsResponseMessage(response));
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java
new file mode 100644
index 0000000..6846d0a
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+/**
+ * Runtime exception thrown when there is a problem during an action (search, get, ...).
+ */
+public class ActionException extends RuntimeException {
+
+ public ActionException(String message) {
+ super(message);
+ }
+
+ public ActionException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java
new file mode 100644
index 0000000..4141bce
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Contains the result of an action (hits, aggregations, ...).
+ */
+public class ActionResponse {
+
+ private boolean succeeded;
+ private long totalHits;
+ private final List<HitWrapper> hits = new LinkedList<>();
+ private final List<AggWrapper> aggregations = new LinkedList<>();
+
+
+ public ActionResponse succeeded(boolean succeeded) {
+ this.succeeded = succeeded;
+ return this;
+ }
+
+ public boolean isSucceeded() {
+ return succeeded;
+ }
+
+ public ActionResponse totalHits(long totalHits) {
+ this.totalHits = totalHits;
+ return this;
+ }
+
+ public long getTotalHits() {
+ return totalHits;
+ }
+
+ public List<HitWrapper> getHits() {
+ return hits;
+ }
+
+ public ActionResponse addHit(HitWrapper hit) {
+ this.hits.add(hit);
+ return this;
+ }
+
+ public List<AggWrapper> getAggregations() {
+ return aggregations;
+ }
+
+ public ActionResponse addAggregation(AggWrapper aggregation) {
+ this.aggregations.add(aggregation);
+ return this;
+ }
+
+ public ActionResponse hit(HitWrapper hit) {
+ this.addHit(hit);
+ return this;
+ }
+
+ public HitWrapper getHit() {
+ return this.hits.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java
new file mode 100644
index 0000000..14446db
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+/**
+ * Contains the result of an aggregation.
+ */
+public class AggWrapper {
+
+ /** Type of an aggregation (to know if there are buckets or not) */
+ public enum AggregationType { SIMPLE, MULTI_BUCKETS };
+
+ private final AggregationType type;
+ private final String result;
+
+ public AggWrapper(AggregationType type, String result) {
+ this.type = type;
+ this.result = result;
+ }
+
+ public AggregationType getType() {
+ return type;
+ }
+
+ public String getResult() {
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java
new file mode 100644
index 0000000..3be4514
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.action;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Contains the data of a hit.
+ */
+public class HitWrapper {
+
+ private final JsonParser parser = new JsonParser();
+
+ private final String index;
+ private final String type;
+ private final String id;
+ private final String source;
+
+ public HitWrapper(String index, String type, String id, String source) {
+ this.index = index;
+ this.type = type;
+ this.id = id;
+ this.source = source;
+ }
+
+ public HitWrapper(String source) {
+ this(null, null, null, source);
+ }
+
+ public String getSourceAsString() {
+ return source;
+ }
+
+ public JsonObject getSourceAsJsonObject() {
+ final JsonElement element = parser.parse(source);
+ return element.getAsJsonObject();
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getId() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java
new file mode 100644
index 0000000..48e1980
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.client;
+
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+
+/**
+ * Interface that must be implemented by any kind of Elasticsearch client (transport, ...).
+ */
+public interface ElasticsearchClient {
+
+ ActionResponse get(String index, String type, String id);
+
+ ActionResponse index(String index, String type, String id, String data);
+
+ ActionResponse delete(String index, String type, String id);
+
+ ActionResponse search(String[] indices, String[] types, String query, int size);
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java
new file mode 100644
index 0000000..d691597
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.client;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
+import org.apache.zeppelin.elasticsearch.action.ActionException;
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper.AggregationType;
+import org.apache.zeppelin.elasticsearch.action.HitWrapper;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import com.google.common.base.Joiner;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParseException;
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.JsonNode;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import com.mashape.unirest.request.HttpRequest;
+import com.mashape.unirest.request.HttpRequestWithBody;
+
+/**
+ * Elasticsearch client using the HTTP API.
+ */
+public class HttpBasedClient implements ElasticsearchClient {
+
+ private static final String QUERY_STRING_TEMPLATE =
+ "{ \"query\": { \"query_string\": { \"query\": \"_Q_\", \"analyze_wildcard\": \"true\" } } }";
+
+ private final String host;
+ private final int port;
+ private final String username;
+ private final String password;
+
+ private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+ public HttpBasedClient(Properties props) {
+ this.host = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
+ this.port = Integer.parseInt(props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
+ this.username = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_USERNAME);
+ this.password = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_PASSWORD);
+ }
+
+ private boolean isSucceeded(HttpResponse response) {
+ return response.getStatus() >= 200 && response.getStatus() < 300;
+ }
+
+ private JSONObject getParentField(JSONObject parent, String[] fields) {
+ JSONObject obj = parent;
+ for (int i = 0; i < fields.length - 1; i++) {
+ obj = obj.getJSONObject(fields[i]);
+ }
+ return obj;
+ }
+
+ private JSONArray getFieldAsArray(JSONObject obj, String field) {
+ final String[] fields = field.split("/");
+ final JSONObject parent = getParentField(obj, fields);
+ return parent.getJSONArray(fields[fields.length - 1]);
+ }
+
+ private String getFieldAsString(HttpResponse<JsonNode> response, String field) {
+ return getFieldAsString(response.getBody(), field);
+ }
+
+ private String getFieldAsString(JsonNode json, String field) {
+ return json.getObject().get(field).toString();
+ }
+
+ private long getFieldAsLong(HttpResponse<JsonNode> response, String field) {
+ final String[] fields = field.split("/");
+ final JSONObject obj = getParentField(response.getBody().getObject(), fields);
+ return obj.getLong(fields[fields.length - 1]);
+ }
+
+ private String getUrl(String index, String type, String id, boolean useSearch) {
+ try {
+ final StringBuilder buffer = new StringBuilder();
+ buffer.append("http://").append(host).append(":").append(port).append("/");
+ if (StringUtils.isNotEmpty(index)) {
+ buffer.append(index);
+
+ if (StringUtils.isNotEmpty(type)) {
+ buffer.append("/").append(type);
+
+ if (StringUtils.isNotEmpty(id)) {
+ if (useSearch) {
+ final String encodedId = URLEncoder.encode(id, "UTF-8");
+ if (id.equals(encodedId)) {
+ // No difference, use directly the id
+ buffer.append("/").append(id);
+ }
+ else {
+ // There are differences: to avoid problems with some special characters
+ // such as / and # in id, use a "terms" query
+ buffer.append("/_search?source=")
+ .append(URLEncoder
+ .encode("{\"query\":{\"terms\":{\"_id\":[\"" + id + "\"]}}}", "UTF-8"));
+ }
+ }
+ else {
+ buffer.append("/").append(id);
+ }
+ }
+ }
+ }
+ return buffer.toString();
+ }
+ catch (final UnsupportedEncodingException e) {
+ throw new ActionException(e);
+ }
+ }
+
+ private String getUrl(String[] indices, String[] types) {
+ final String inds = indices == null ? null : Joiner.on(",").join(indices);
+ final String typs = types == null ? null : Joiner.on(",").join(types);
+ return getUrl(inds, typs, null, false);
+ }
+
+ @Override
+ public ActionResponse get(String index, String type, String id) {
+ ActionResponse response = null;
+ try {
+ final HttpRequest request = Unirest.get(getUrl(index, type, id, true));
+ if (StringUtils.isNotEmpty(username)) {
+ request.basicAuth(username, password);
+ }
+
+ final HttpResponse<String> result = request.asString();
+ final boolean isSucceeded = isSucceeded(result);
+
+ if (isSucceeded) {
+ final JsonNode body = new JsonNode(result.getBody());
+ if (body.getObject().has("_index")) {
+ response = new ActionResponse()
+ .succeeded(true)
+ .hit(new HitWrapper(
+ getFieldAsString(body, "_index"),
+ getFieldAsString(body, "_type"),
+ getFieldAsString(body, "_id"),
+ getFieldAsString(body, "_source")));
+ }
+ else {
+ final JSONArray hits = getFieldAsArray(body.getObject(), "hits/hits");
+ final JSONObject hit = (JSONObject) hits.iterator().next();
+ response = new ActionResponse()
+ .succeeded(true)
+ .hit(new HitWrapper(
+ hit.getString("_index"),
+ hit.getString("_type"),
+ hit.getString("_id"),
+ hit.opt("_source").toString()));
+ }
+ }
+ else {
+ if (result.getStatus() == 404) {
+ response = new ActionResponse()
+ .succeeded(false);
+ }
+ else {
+ throw new ActionException(result.getBody());
+ }
+ }
+ }
+ catch (final UnirestException e) {
+ throw new ActionException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public ActionResponse delete(String index, String type, String id) {
+ ActionResponse response = null;
+ try {
+ final HttpRequest request = Unirest.delete(getUrl(index, type, id, true));
+ if (StringUtils.isNotEmpty(username)) {
+ request.basicAuth(username, password);
+ }
+
+ final HttpResponse<String> result = request.asString();
+ final boolean isSucceeded = isSucceeded(result);
+
+ if (isSucceeded) {
+ final JsonNode body = new JsonNode(result.getBody());
+ response = new ActionResponse()
+ .succeeded(true)
+ .hit(new HitWrapper(
+ getFieldAsString(body, "_index"),
+ getFieldAsString(body, "_type"),
+ getFieldAsString(body, "_id"),
+ null));
+ }
+ else {
+ throw new ActionException(result.getBody());
+ }
+ }
+ catch (final UnirestException e) {
+ throw new ActionException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public ActionResponse index(String index, String type, String id, String data) {
+ ActionResponse response = null;
+ try {
+ HttpRequestWithBody request = null;
+ if (StringUtils.isEmpty(id)) {
+ request = Unirest.post(getUrl(index, type, id, false));
+ }
+ else {
+ request = Unirest.put(getUrl(index, type, id, false));
+ }
+ request
+ .header("Accept", "application/json")
+ .header("Content-Type", "application/json")
+ .body(data).getHttpRequest();
+ if (StringUtils.isNotEmpty(username)) {
+ request.basicAuth(username, password);
+ }
+
+ final HttpResponse<JsonNode> result = request.asJson();
+ final boolean isSucceeded = isSucceeded(result);
+
+ if (isSucceeded) {
+ response = new ActionResponse()
+ .succeeded(true)
+ .hit(new HitWrapper(
+ getFieldAsString(result, "_index"),
+ getFieldAsString(result, "_type"),
+ getFieldAsString(result, "_id"),
+ null));
+ }
+ else {
+ throw new ActionException(result.getBody().toString());
+ }
+ }
+ catch (final UnirestException e) {
+ throw new ActionException(e);
+ }
+ return response;
+ }
+
+ @Override
+ public ActionResponse search(String[] indices, String[] types, String query, int size) {
+ ActionResponse response = null;
+
+ if (!StringUtils.isEmpty(query)) {
+ // The query can be either JSON-formatted, nor a Lucene query
+ // So, try to parse as a JSON => if there is an error, consider the query a Lucene one
+ try {
+ gson.fromJson(query, Map.class);
+ }
+ catch (final JsonParseException e) {
+ // This is not a JSON (or maybe not well formatted...)
+ query = QUERY_STRING_TEMPLATE.replace("_Q_", query);
+ }
+ }
+
+ try {
+ final HttpRequestWithBody request = Unirest
+ .post(getUrl(indices, types) + "/_search?size=" + size)
+ .header("Content-Type", "application/json");
+
+ if (StringUtils.isNoneEmpty(query)) {
+ request.header("Accept", "application/json").body(query);
+ }
+
+ if (StringUtils.isNotEmpty(username)) {
+ request.basicAuth(username, password);
+ }
+
+ final HttpResponse<JsonNode> result = request.asJson();
+ final JSONObject body = result.getBody() != null ? result.getBody().getObject() : null;
+
+ if (isSucceeded(result)) {
+ final long total = getFieldAsLong(result, "hits/total");
+
+ response = new ActionResponse()
+ .succeeded(true)
+ .totalHits(total);
+
+ if (containsAggs(result)) {
+ JSONObject aggregationsMap = body.getJSONObject("aggregations");
+ if (aggregationsMap == null) {
+ aggregationsMap = body.getJSONObject("aggs");
+ }
+
+ for (final String key: aggregationsMap.keySet()) {
+ final JSONObject aggResult = aggregationsMap.getJSONObject(key);
+ if (aggResult.has("buckets")) {
+ // Multi-bucket aggregations
+ final Iterator<Object> buckets = aggResult.getJSONArray("buckets").iterator();
+ while (buckets.hasNext()) {
+ response.addAggregation(
+ new AggWrapper(AggregationType.MULTI_BUCKETS, buckets.next().toString()));
+ }
+ }
+ else {
+ response.addAggregation(
+ new AggWrapper(AggregationType.SIMPLE, aggregationsMap.toString()));
+ }
+ break; // Keep only one aggregation
+ }
+ }
+ else if (size > 0 && total > 0) {
+ final JSONArray hits = getFieldAsArray(body, "hits/hits");
+ final Iterator<Object> iter = hits.iterator();
+
+ while (iter.hasNext()) {
+ final JSONObject hit = (JSONObject) iter.next();
+ final Object data =
+ hit.opt("_source") != null ? hit.opt("_source") : hit.opt("fields");
+ response.addHit(new HitWrapper(
+ hit.getString("_index"),
+ hit.getString("_type"),
+ hit.getString("_id"),
+ data.toString()));
+ }
+ }
+ }
+ else {
+ throw new ActionException(body.get("error").toString());
+ }
+ }
+ catch (final UnirestException e) {
+ throw new ActionException(e);
+ }
+
+ return response;
+ }
+
+ private boolean containsAggs(HttpResponse<JsonNode> result) {
+ return result.getBody() != null &&
+ (result.getBody().getObject().has("aggregations") ||
+ result.getBody().getObject().has("aggs"));
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public String toString() {
+ return "HttpBasedClient [host=" + host + ", port=" + port + ", username=" + username + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java
new file mode 100644
index 0000000..1451019
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.elasticsearch.client;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
+import org.apache.zeppelin.elasticsearch.action.ActionResponse;
+import org.apache.zeppelin.elasticsearch.action.AggWrapper;
+import org.apache.zeppelin.elasticsearch.action.HitWrapper;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHitField;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
+
+/**
+ * Elasticsearch client using the transport protocol.
+ */
+public class TransportBasedClient implements ElasticsearchClient {
+
+ private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ private final Client client;
+
+ public TransportBasedClient(Properties props) throws UnknownHostException {
+ final String host =
+ props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
+ final int port = Integer.parseInt(
+ props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
+ final String clusterName =
+ props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME);
+
+ final Settings settings = Settings.settingsBuilder()
+ .put("cluster.name", clusterName)
+ .put(props)
+ .build();
+
+ client = TransportClient.builder().settings(settings).build()
+ .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+ }
+
+ @Override
+ public ActionResponse get(String index, String type, String id) {
+ final GetResponse getResp = client
+ .prepareGet(index, type, id)
+ .get();
+
+ return new ActionResponse()
+ .succeeded(getResp.isExists())
+ .hit(new HitWrapper(
+ getResp.getIndex(),
+ getResp.getType(),
+ getResp.getId(),
+ getResp.getSourceAsString()));
+ }
+
+ @Override
+ public ActionResponse delete(String index, String type, String id) {
+ final DeleteResponse delResp = client
+ .prepareDelete(index, type, id)
+ .get();
+
+ return new ActionResponse()
+ .succeeded(delResp.isFound())
+ .hit(new HitWrapper(
+ delResp.getIndex(),
+ delResp.getType(),
+ delResp.getId(),
+ null));
+ }
+
+ @Override
+ public ActionResponse index(String index, String type, String id, String data) {
+ final IndexResponse idxResp = client
+ .prepareIndex(index, type, id)
+ .setSource(data)
+ .get();
+
+ return new ActionResponse()
+ .succeeded(idxResp.isCreated())
+ .hit(new HitWrapper(
+ idxResp.getIndex(),
+ idxResp.getType(),
+ idxResp.getId(),
+ null));
+ }
+
+ @Override
+ public ActionResponse search(String[] indices, String[] types, String query, int size) {
+ final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
+ client, SearchAction.INSTANCE);
+ reqBuilder.setIndices();
+
+ if (indices != null) {
+ reqBuilder.setIndices(indices);
+ }
+ if (types != null) {
+ reqBuilder.setTypes(types);
+ }
+
+ if (!StringUtils.isEmpty(query)) {
+ // The query can be either JSON-formatted, nor a Lucene query
+ // So, try to parse as a JSON => if there is an error, consider the query a Lucene one
+ try {
+ @SuppressWarnings("rawtypes")
+ final Map source = gson.fromJson(query, Map.class);
+ reqBuilder.setExtraSource(source);
+ }
+ catch (final JsonSyntaxException e) {
+ // This is not a JSON (or maybe not well formatted...)
+ reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
+ }
+ }
+
+ reqBuilder.setSize(size);
+
+ final SearchResponse searchResp = reqBuilder.get();
+
+ final ActionResponse actionResp = new ActionResponse()
+ .succeeded(true)
+ .totalHits(searchResp.getHits().getTotalHits());
+
+ if (searchResp.getAggregations() != null) {
+ setAggregations(searchResp.getAggregations(), actionResp);
+ }
+ else {
+ for (final SearchHit hit: searchResp.getHits()) {
+ // Fields can be found either in _source, or in fields (it depends on the query)
+ // => specific for elasticsearch's version < 5
+ //
+ String src = hit.getSourceAsString();
+ if (src == null) {
+ final Map<String, Object> hitFields = new HashMap<>();
+ for (final SearchHitField hitField : hit.getFields().values()) {
+ hitFields.put(hitField.getName(), hitField.getValues());
+ }
+ src = gson.toJson(hitFields);
+ }
+ actionResp.addHit(new HitWrapper(hit.getIndex(), hit.getType(), hit.getId(), src));
+ }
+ }
+
+ return actionResp;
+ }
+
+ private void setAggregations(Aggregations aggregations, ActionResponse actionResp) {
+ // Only the result of the first aggregation is returned
+ //
+ final Aggregation agg = aggregations.asList().get(0);
+
+ if (agg instanceof InternalMetricsAggregation) {
+ actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
+ XContentHelper.toString((InternalMetricsAggregation) agg).toString()));
+ }
+ else if (agg instanceof InternalSingleBucketAggregation) {
+ actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
+ XContentHelper.toString((InternalSingleBucketAggregation) agg).toString()));
+ }
+ else if (agg instanceof InternalMultiBucketAggregation) {
+ final Set<String> headerKeys = new HashSet<>();
+ final List<Map<String, Object>> buckets = new LinkedList<>();
+ final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
+
+ for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
+ try {
+ final XContentBuilder builder = XContentFactory.jsonBuilder();
+ bucket.toXContent(builder, null);
+ actionResp.addAggregation(
+ new AggWrapper(AggWrapper.AggregationType.MULTI_BUCKETS, builder.string()));
+ }
+ catch (final IOException e) {
+ // Ignored
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "TransportBasedClient []";
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/resources/interpreter-setting.json b/elasticsearch/src/main/resources/interpreter-setting.json
index 089c811..18200ae 100644
--- a/elasticsearch/src/main/resources/interpreter-setting.json
+++ b/elasticsearch/src/main/resources/interpreter-setting.json
@@ -16,6 +16,12 @@
"defaultValue": "9300",
"description": "The port for Elasticsearch"
},
+ "elasticsearch.client.type": {
+ "envName": "ELASTICSEARCH_CLIENT_TYPE",
+ "propertyName": "elasticsearch.client.type",
+ "defaultValue": "transport",
+ "description": "The type of client for Elasticsearch (transport or http)"
+ },
"elasticsearch.cluster.name": {
"envName": "ELASTICSEARCH_CLUSTER_NAME",
"propertyName": "elasticsearch.cluster.name",
@@ -27,6 +33,18 @@
"propertyName": "elasticsearch.result.size",
"defaultValue": "10",
"description": "The size of the result set of a search query"
+ },
+ "elasticsearch.basicauth.username": {
+ "envName": "ELASTICSEARCH_BASIC_AUTH_USERNAME",
+ "propertyName": "elasticsearch.basicauth.username",
+ "defaultValue": "",
+ "description": "Username for a basic authentication"
+ },
+ "elasticsearch.basicauth.password": {
+ "envName": "ELASTICSEARCH_BASIC_AUTH_PASSWORD",
+ "propertyName": "elasticsearch.basicauth.password",
+ "defaultValue": "",
+ "description": "Password for a basic authentication"
}
},
"editor": {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
index 8d3a14b..aece163 100644
--- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
+++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.elasticsearch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,8 +28,11 @@ import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.RandomUtils;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -40,13 +44,19 @@ import org.elasticsearch.node.NodeBuilder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.experimental.theories.DataPoint;
+import org.junit.experimental.theories.Theories;
+import org.junit.experimental.theories.Theory;
+import org.junit.runner.RunWith;
+@RunWith(Theories.class)
public class ElasticsearchInterpreterTest {
+ @DataPoint public static ElasticsearchInterpreter transportInterpreter;
+ @DataPoint public static ElasticsearchInterpreter httpInterpreter;
+
private static Client elsClient;
private static Node elsNode;
- private static ElasticsearchInterpreter interpreter;
private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" };
private static final int[] STATUS = { 200, 404, 500, 403 };
@@ -57,6 +67,8 @@ public class ElasticsearchInterpreterTest {
private static final String ELS_HTTP_PORT = "10200";
private static final String ELS_PATH = "/tmp/els";
+ private static final AtomicInteger deleteId = new AtomicInteger(2);
+
@BeforeClass
public static void populate() throws IOException {
@@ -80,7 +92,7 @@ public class ElasticsearchInterpreterTest {
.endObject()
.endObject().endObject().endObject()).get();
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 48; i++) {
elsClient.prepareIndex("logs", "http", "" + i)
.setRefresh(true)
.setSource(jsonBuilder()
@@ -97,22 +109,50 @@ public class ElasticsearchInterpreterTest {
.get();
}
+ for (int i = 1; i < 3; i++) {
+ elsClient.prepareIndex("logs", "http", "very/strange/id#" + i)
+ .setRefresh(true)
+ .setSource(jsonBuilder()
+ .startObject()
+ .field("date", new Date())
+ .startObject("request")
+ .field("method", METHODS[RandomUtils.nextInt(METHODS.length)])
+ .field("url", "/zeppelin/" + UUID.randomUUID().toString())
+ .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org"))
+ .endObject()
+ .field("status", STATUS[RandomUtils.nextInt(STATUS.length)])
+ .field("content_length", RandomUtils.nextInt(2000))
+ )
+ .get();
+ }
+
final Properties props = new Properties();
props.put(ElasticsearchInterpreter.ELASTICSEARCH_HOST, ELS_HOST);
- props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME, ELS_CLUSTER_NAME);
- interpreter = new ElasticsearchInterpreter(props);
- interpreter.open();
+
+ props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
+ props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "transport");
+ transportInterpreter = new ElasticsearchInterpreter(props);
+ transportInterpreter.open();
+
+ props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_HTTP_PORT);
+ props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "http");
+ httpInterpreter = new ElasticsearchInterpreter(props);
+ httpInterpreter.open();
}
@AfterClass
public static void clean() {
- if (interpreter != null) {
- interpreter.close();
+ if (transportInterpreter != null) {
+ transportInterpreter.close();
+ }
+
+ if (httpInterpreter != null) {
+ httpInterpreter.close();
}
if (elsClient != null) {
- elsClient.admin().indices().delete(new DeleteIndexRequest("logs")).actionGet();
+ elsClient.admin().indices().delete(new DeleteIndexRequest("*")).actionGet();
elsClient.close();
}
@@ -121,97 +161,140 @@ public class ElasticsearchInterpreterTest {
}
}
- @Test
- public void testCount() {
+ private InterpreterContext buildContext(String noteAndParagraphId) {
+ final AngularObjectRegistry angularObjReg = new AngularObjectRegistry("elasticsearch", null);
+ return new InterpreterContext(noteAndParagraphId, noteAndParagraphId, null, null, null, null, null,
+ null, angularObjReg , null, null, null);
+ }
+
+ @Theory
+ public void testCount(ElasticsearchInterpreter interpreter) {
+
+ final InterpreterContext ctx = buildContext("testCount");
- InterpreterResult res = interpreter.interpret("count /unknown", null);
+ InterpreterResult res = interpreter.interpret("count /unknown", ctx);
assertEquals(Code.ERROR, res.code());
- res = interpreter.interpret("count /logs", null);
+ res = interpreter.interpret("count /logs", ctx);
+ assertEquals(Code.SUCCESS, res.code());
assertEquals("50", res.message().get(0).getData());
+ assertNotNull(ctx.getAngularObjectRegistry().get("count_testCount", null, null));
+ assertEquals(50l, ctx.getAngularObjectRegistry().get("count_testCount", null, null).get());
+
+ res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx);
+ assertEquals(Code.SUCCESS, res.code());
}
- @Test
- public void testGet() {
+ @Theory
+ public void testGet(ElasticsearchInterpreter interpreter) {
+
+ final InterpreterContext ctx = buildContext("get");
- InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null);
+ InterpreterResult res = interpreter.interpret("get /logs/http/unknown", ctx);
assertEquals(Code.ERROR, res.code());
- res = interpreter.interpret("get /logs/http/10", null);
+ res = interpreter.interpret("get /logs/http/unknown/unknown", ctx);
+ assertEquals(Code.ERROR, res.code());
+
+ res = interpreter.interpret("get /unknown/unknown/unknown", ctx);
+ assertEquals(Code.ERROR, res.code());
+
+ res = interpreter.interpret("get /logs/http/very/strange/id#1", ctx);
+ assertEquals(Code.SUCCESS, res.code());
+
+ res = interpreter.interpret("get /logs/http/4", ctx);
+ assertEquals(Code.SUCCESS, res.code());
+
+ res = interpreter.interpret("get /logs/_all/4", ctx);
assertEquals(Code.SUCCESS, res.code());
}
- @Test
- public void testSearch() {
+ @Theory
+ public void testSearch(ElasticsearchInterpreter interpreter) {
+
+ final InterpreterContext ctx = buildContext("search");
- InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null);
+ InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", ctx);
assertEquals(Code.SUCCESS, res.code());
- res = interpreter.interpret("search /logs {{{hello}}}", null);
+ res = interpreter.interpret("search /logs {{{hello}}}", ctx);
assertEquals(Code.ERROR, res.code());
- res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", null);
+ res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
- res = interpreter.interpret("search /logs status:404", null);
+ res = interpreter.interpret("search /logs status:404", ctx);
assertEquals(Code.SUCCESS, res.code());
- res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", null);
+ res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
}
- @Test
- public void testAgg() {
+ @Theory
+ public void testAgg(ElasticsearchInterpreter interpreter) {
+
+ final InterpreterContext ctx = buildContext("agg");
// Single-value metric
InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " +
- " { \"cardinality\" : { \"field\" : \"status\" } } } }", null);
+ " { \"cardinality\" : { \"field\" : \"status\" } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
// Multi-value metric
res = interpreter.interpret("search /logs { \"aggs\" : { \"content_length_stats\" : " +
- " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", null);
+ " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
// Single bucket
res = interpreter.interpret("search /logs { \"aggs\" : { " +
" \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " +
- " \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", null);
+ " \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
// Multi-buckets
res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
- " { \"terms\" : { \"field\" : \"status\" } } } }", null);
+ " { \"terms\" : { \"field\" : \"status\" } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
-
+
res = interpreter.interpret("search /logs { \"aggs\" : { " +
" \"length\" : { \"terms\": { \"field\": \"status\" }, " +
- " \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null);
+ " \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
}
- @Test
- public void testIndex() {
+ @Theory
+ public void testIndex(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.ERROR, res.code());
+ res = interpreter.interpret("index /logs/http { bad ", null);
+ assertEquals(Code.ERROR, res.code());
+
res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.SUCCESS, res.code());
+
+ res = interpreter.interpret("index /logs/http/1000 { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
+ assertEquals(Code.SUCCESS, res.code());
}
- @Test
- public void testDelete() {
+ @Theory
+ public void testDelete(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null);
assertEquals(Code.ERROR, res.code());
- res = interpreter.interpret("delete /logs/http/11", null);
- assertEquals("11", res.message().get(0).getData());
+ res = interpreter.interpret("delete /unknown/unknown/unknown", null);
+ assertEquals(Code.ERROR, res.code());
+
+ final int testDeleteId = deleteId.decrementAndGet();
+ res = interpreter.interpret("delete /logs/http/" + testDeleteId, null);
+ assertEquals(Code.SUCCESS, res.code());
+ assertEquals("" + testDeleteId, res.message().get(0).getData());
}
- @Test
- public void testMisc() {
+ @Theory
+ public void testMisc(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret(null, null);
assertEquals(Code.SUCCESS, res.code());
@@ -220,23 +303,23 @@ public class ElasticsearchInterpreterTest {
assertEquals(Code.SUCCESS, res.code());
}
- @Test
- public void testCompletion() {
- List expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
- List expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
+ @Theory
+ public void testCompletion(ElasticsearchInterpreter interpreter) {
+ final List<InterpreterCompletion> expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
+ final List<InterpreterCompletion> expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
- List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
- List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
- List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
+ final List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
+ final List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
+ final List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
Assert.assertEquals(expectedResultOne, resultOne);
Assert.assertEquals(expectedResultTwo, resultTwo);
- List allCompletionList = new ArrayList<>();
- for (InterpreterCompletion ic : resultAll) {
+ final List<String> allCompletionList = new ArrayList<>();
+ for (final InterpreterCompletion ic : resultAll) {
allCompletionList.add(ic.getName());
}
- Assert.assertEquals(interpreter.COMMANDS, allCompletionList);
+ Assert.assertEquals(ElasticsearchInterpreter.COMMANDS, allCompletionList);
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e763b3bf/zeppelin-distribution/src/bin_license/LICENSE
----------------------------------------------------------------------
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index 1acdf58..d27f27d 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -13,6 +13,7 @@ The following components are provided under Apache License.
(Apache 2.0) Apache Commons Exec (commons-exec:commons-exec:1.3 - http://commons.apache.org/exec/)
(Apache 2.0) Http Components (org.apache.httpcomponents:httpcore:4.3.3 - https://github.com/apache/httpclient)
(Apache 2.0) Http Components (org.apache.httpcomponents:httpclient:4.3.6 - https://github.com/apache/httpclient)
+ (Apache 2.0) Http Components (org.apache.httpcomponents:httpasyncclient:4.0.2 - https://github.com/apache/httpclient)
(Apache 2.0) Apache Commons Lang (org.apache.commons:commons-lang:2.5 - http://commons.apache.org/proper/commons-lang/)
(Apache 2.0) Apache Commons Lang 3 (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/)
(Apache 2.0) Apache Commons Math 3 (org.apache.commons:commons-math3:3.6.1 - http://commons.apache.org/proper/commons-math/)
@@ -266,6 +267,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
(The MIT License) Java String Similarity 0.12 (info.debatty:java-string-similarity:0.12 - https://github.com/tdebatty/java-string-similarity)
(The MIT License) Java LSH 0.10 (info.debatty:java-lsh:0.10 - https://github.com/tdebatty/java-LSH)
(The MIT License) JSoup 1.6.1 (org.jsoup:jsoup:1.6.1 - https://github.com/jhy/jsoup/)
+ (The MIT License) Unirest 1.4.9 (com.mashape.unirest:unirest-java:1.4.9 - https://github.com/Mashape/unirest-java)
(The MIT License) ngclipboard v1.1.1 (https://github.com/sachinchoolur/ngclipboard) - https://github.com/sachinchoolur/ngclipboard/blob/1.1.1/LICENSE)
========================================================================