You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by bz...@apache.org on 2016/10/19 04:51:49 UTC

zeppelin git commit: [ZEPPELIN-1537] Elasticsearch improvement for results of aggregations

Repository: zeppelin
Updated Branches:
  refs/heads/master abf0470a3 -> fab3e5ff9


[ZEPPELIN-1537] Elasticsearch improvement for results of aggregations

### What is this PR for?
The result of an aggregation query returned by the interpreter contains only "key" and "doc_count" in case of a multi-buckets aggregations.
But the result returned by Elasticsearch can contain more data according to the query.
This PR is an improvement of the result returned by the interpreter.

### What type of PR is it?
[Improvement]

### Todos
* [X] - Dev of the improvement in the interpreter
* [X] - Add a test case

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1537

### How should this be tested?
In a paragraph, enter a query with multiple aggregations:
search /logs { "aggs" : {
            "length" : { "terms": { "field": "status" },
            "aggs" : { "sum_length" : { "sum" : { "field" : "content_length" } } } } }

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Bruno Bonnin <bb...@gmail.com>

Closes #1508 from bbonnin/master and squashes the following commits:

a0a7bb9 [Bruno Bonnin] Elasticsearch improvement for results of aggregations


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/fab3e5ff
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/fab3e5ff
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/fab3e5ff

Branch: refs/heads/master
Commit: fab3e5ff93736bd4cc366516b3de403fff2eea8b
Parents: abf0470
Author: Bruno Bonnin <bb...@gmail.com>
Authored: Wed Oct 12 23:20:25 2016 +0200
Committer: Alexander Bezzubov <bz...@apache.org>
Committed: Wed Oct 19 13:51:44 2016 +0900

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchInterpreter.java | 39 ++++++++++++++++----
 .../ElasticsearchInterpreterTest.java           | 12 +++++-
 2 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fab3e5ff/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
index 511f1b5..549b5f2 100644
--- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
+++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,7 +36,6 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.elasticsearch.action.delete.DeleteResponse;
@@ -48,6 +48,8 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
@@ -437,14 +439,37 @@ public class ElasticsearchInterpreter extends Interpreter {
       resMsg = XContentHelper.toString((InternalSingleBucketAggregation) agg).toString();
     }
     else if (agg instanceof InternalMultiBucketAggregation) {
-      final StringBuffer buffer = new StringBuffer("key\tdoc_count");
-
+      final Set<String> headerKeys = new HashSet<>();
+      final List<Map<String, Object>> buckets = new LinkedList<>();
       final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
+      
       for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
-        buffer.append("\n")
-          .append(bucket.getKeyAsString())
-          .append("\t")
-          .append(bucket.getDocCount());
+        try {
+          final XContentBuilder builder = XContentFactory.jsonBuilder();
+          bucket.toXContent(builder, null);
+          final Map<String, Object> bucketMap = JsonFlattener.flattenAsMap(builder.string());
+          headerKeys.addAll(bucketMap.keySet());
+          buckets.add(bucketMap);
+        }
+        catch (IOException e) {
+          logger.error("Processing bucket: " + e.getMessage(), e);
+        }
+      }
+            
+      final StringBuffer buffer = new StringBuffer();
+      final String[] keys = headerKeys.toArray(new String[0]);
+      for (String key: keys) {
+        buffer.append("\t" + key);
+      }
+      buffer.deleteCharAt(0);
+      
+      for (Map<String, Object> bucket : buckets) {
+        buffer.append("\n");
+        
+        for (String key: keys) {
+          buffer.append(bucket.get(key)).append("\t");
+        }
+        buffer.deleteCharAt(buffer.length() - 1);
       }
 
       resType = InterpreterResult.Type.TABLE;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fab3e5ff/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
index e23cbb6..e47a49e 100644
--- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
+++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java
@@ -21,7 +21,12 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
 
 import org.apache.commons.lang.math.RandomUtils;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -178,6 +183,11 @@ public class ElasticsearchInterpreterTest {
     res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
             " { \"terms\" : { \"field\" : \"status\" } } } }", null);
     assertEquals(Code.SUCCESS, res.code());
+    
+    res = interpreter.interpret("search /logs { \"aggs\" : { " +
+            " \"length\" : { \"terms\": { \"field\": \"status\" }, " +
+            "   \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null);
+    assertEquals(Code.SUCCESS, res.code());
   }
 
   @Test