You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by bz...@apache.org on 2016/10/19 04:51:49 UTC
zeppelin git commit: [ZEPPELIN-1537] Elasticsearch improvement for
results of aggregations
Repository: zeppelin
Updated Branches:
refs/heads/master abf0470a3 -> fab3e5ff9
[ZEPPELIN-1537] Elasticsearch improvement for results of aggregations
### What is this PR for?
The result of an aggregation query returned by the interpreter contains only "key" and "doc_count" in case of a multi-buckets aggregations.
But the result returned by Elasticsearch can contain more data according to the query.
This PR is an improvement of the result returned by the interpreter.
### What type of PR is it?
[Improvement]
### Todos
* [X] - Dev of the improvement in the interpreter
* [X] - Add a test case
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1537
### How should this be tested?
In a paragraph, enter a query with multiple aggregations:
search /logs { "aggs" : {
"length" : { "terms": { "field": "status" },
"aggs" : { "sum_length" : { "sum" : { "field" : "content_length" } } } } }
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Bruno Bonnin <bb...@gmail.com>
Closes #1508 from bbonnin/master and squashes the following commits:
a0a7bb9 [Bruno Bonnin] Elasticsearch improvement for results of aggregations
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/fab3e5ff
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/fab3e5ff
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/fab3e5ff
Branch: refs/heads/master
Commit: fab3e5ff93736bd4cc366516b3de403fff2eea8b
Parents: abf0470
Author: Bruno Bonnin <bb...@gmail.com>
Authored: Wed Oct 12 23:20:25 2016 +0200
Committer: Alexander Bezzubov <bz...@apache.org>
Committed: Wed Oct 19 13:51:44 2016 +0900
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchInterpreter.java | 39 ++++++++++++++++----
.../ElasticsearchInterpreterTest.java | 12 +++++-
2 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fab3e5ff/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
index 511f1b5..549b5f2 100644
--- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -35,7 +36,6 @@ import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.elasticsearch.action.delete.DeleteResponse;
@@ -48,6 +48,8 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
@@ -437,14 +439,37 @@ public class ElasticsearchInterpreter extends Interpreter {
resMsg = XContentHelper.toString((InternalSingleBucketAggregation) agg).toString();
}
else if (agg instanceof InternalMultiBucketAggregation) {
- final StringBuffer buffer = new StringBuffer("key\tdoc_count");
-
+ final Set<String> headerKeys = new HashSet<>();
+ final List<Map<String, Object>> buckets = new LinkedList<>();
final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
+
for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
- buffer.append("\n")
- .append(bucket.getKeyAsString())
- .append("\t")
- .append(bucket.getDocCount());
+ try {
+ final XContentBuilder builder = XContentFactory.jsonBuilder();
+ bucket.toXContent(builder, null);
+ final Map<String, Object> bucketMap = JsonFlattener.flattenAsMap(builder.string());
+ headerKeys.addAll(bucketMap.keySet());
+ buckets.add(bucketMap);
+ }
+ catch (IOException e) {
+ logger.error("Processing bucket: " + e.getMessage(), e);
+ }
+ }
+
+ final StringBuffer buffer = new StringBuffer();
+ final String[] keys = headerKeys.toArray(new String[0]);
+ for (String key: keys) {
+ buffer.append("\t" + key);
+ }
+ buffer.deleteCharAt(0);
+
+ for (Map<String, Object> bucket : buckets) {
+ buffer.append("\n");
+
+ for (String key: keys) {
+ buffer.append(bucket.get(key)).append("\t");
+ }
+ buffer.deleteCharAt(buffer.length() - 1);
}
resType = InterpreterResult.Type.TABLE;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fab3e5ff/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
index e23cbb6..e47a49e 100644
--- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
+++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
@@ -21,7 +21,12 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -178,6 +183,11 @@ public class ElasticsearchInterpreterTest {
res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
" { \"terms\" : { \"field\" : \"status\" } } } }", null);
assertEquals(Code.SUCCESS, res.code());
+
+ res = interpreter.interpret("search /logs { \"aggs\" : { " +
+ " \"length\" : { \"terms\": { \"field\": \"status\" }, " +
+ " \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null);
+ assertEquals(Code.SUCCESS, res.code());
}
@Test