You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2015/08/11 14:20:50 UTC
[27/50] [abbrv] incubator-lens git commit: LENS-252 : Add elastic
search driver
LENS-252 : Add elastic search driver
Project: http://git-wip-us.apache.org/repos/asf/incubator-lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-lens/commit/bffa78c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-lens/tree/bffa78c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-lens/diff/bffa78c9
Branch: refs/heads/current-release-line
Commit: bffa78c9705d8eb75f72e0c916294af162e722c7
Parents: 882182a
Author: Amruth S <am...@gmail.com>
Authored: Fri Jul 31 13:44:54 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Fri Jul 31 13:44:54 2015 +0530
----------------------------------------------------------------------
.../apache/lens/doc/TestGenerateConfigDoc.java | 21 +
lens-driver-es/pom.xml | 79 +++
.../lens/driver/es/ASTTraverserForES.java | 359 ++++++++++++
.../org/apache/lens/driver/es/ESDriver.java | 389 +++++++++++++
.../apache/lens/driver/es/ESDriverConfig.java | 122 ++++
.../java/org/apache/lens/driver/es/ESQuery.java | 55 ++
.../apache/lens/driver/es/client/ESClient.java | 169 ++++++
.../lens/driver/es/client/ESResultSet.java | 76 +++
.../driver/es/client/jest/JestClientImpl.java | 94 +++
.../client/jest/JestResultSetTransformer.java | 215 +++++++
.../driver/es/exceptions/ESClientException.java | 62 ++
.../es/exceptions/InvalidQueryException.java | 65 +++
.../es/translator/ASTCriteriaVisitor.java | 48 ++
.../lens/driver/es/translator/ASTVisitor.java | 47 ++
.../es/translator/CriteriaVisitorFactory.java | 26 +
.../lens/driver/es/translator/ESVisitor.java | 178 ++++++
.../es/translator/impl/ESAggregateVisitor.java | 106 ++++
.../es/translator/impl/ESCriteriaVisitor.java | 100 ++++
.../impl/ESCriteriaVisitorFactory.java | 29 +
.../es/translator/impl/ESTermVisitor.java | 94 +++
.../src/main/resources/esdriver-default.xml | 62 ++
.../org/apache/lens/driver/es/ESDriverTest.java | 43 ++
.../org/apache/lens/driver/es/MockClientES.java | 145 +++++
.../lens/driver/es/QueryTranslationTest.java | 134 +++++
.../driver/es/ResultSetTransformationTest.java | 573 +++++++++++++++++++
.../lens/driver/es/ScrollingQueryTest.java | 93 +++
.../src/test/resources/invalid-queries.data | 18 +
.../src/test/resources/valid-queries.data | 67 +++
.../lens/server/api/LensConfConstants.java | 10 +
lens-server/pom.xml | 5 +
pom.xml | 34 ++
src/site/apt/admin/config-server.apt | 19 +-
src/site/apt/admin/esdriver-config.apt | 41 ++
33 files changed, 3570 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
----------------------------------------------------------------------
diff --git a/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java b/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
index 5ef4186..8025360 100644
--- a/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
+++ b/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
@@ -62,6 +62,11 @@ public class TestGenerateConfigDoc {
public static final String JDBC_DRIVER_CONF_FILE = "../lens-driver-jdbc/src/main/resources/jdbcdriver-default.xml";
/**
+ * The Constant ES_DRIVER_CONF_FILE.
+ */
+ public static final String ES_DRIVER_CONF_FILE = "../lens-driver-es/src/main/resources/esdriver-default.xml";
+
+ /**
* The Constant CLIENT_CONF_FILE.
*/
public static final String CLIENT_CONF_FILE = "../lens-client/src/main/resources/lens-client-default.xml";
@@ -92,6 +97,11 @@ public class TestGenerateConfigDoc {
public static final String JDBC_DRIVER_APT_FILE = "../src/site/apt/admin/jdbcdriver-config.apt";
/**
+ * The Constant ES_DRIVER_APT_FILE.
+ */
+ public static final String ES_DRIVER_APT_FILE = "../src/site/apt/admin/esdriver-config.apt";
+
+ /**
* The Constant CLIENT_APT_FILE.
*/
public static final String CLIENT_APT_FILE = "../src/site/apt/user/client-config.apt";
@@ -146,6 +156,17 @@ public class TestGenerateConfigDoc {
}
/**
+ * Generate esdriver config doc.
+ *
+ * @throws Exception the exception
+ */
+ @Test
+ public void generateESdriverConfigDoc() throws Exception {
+ ConfigPrinter printer = new ConfigPrinter(ES_DRIVER_CONF_FILE, ES_DRIVER_APT_FILE);
+ printer.generateDoc("ES driver configuration");
+ }
+
+ /**
* Generate client config doc.
*
* @throws Exception the exception
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/pom.xml
----------------------------------------------------------------------
diff --git a/lens-driver-es/pom.xml b/lens-driver-es/pom.xml
new file mode 100644
index 0000000..7cff3ff
--- /dev/null
+++ b/lens-driver-es/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.lens</groupId>
+ <artifactId>apache-lens</artifactId>
+ <version>2.3.0-beta-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>lens-driver-es</artifactId>
+ <packaging>jar</packaging>
+ <description>ES execution driver</description>
+
+ <name>Lens Elastic Search Driver</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lens</groupId>
+ <artifactId>lens-cube</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>lens-api</artifactId>
+ <groupId>org.apache.lens</groupId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.searchbox</groupId>
+ <artifactId>jest</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java
new file mode 100644
index 0000000..5e082e5
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lens.cube.parse.HQLParser;
+import org.apache.lens.driver.es.exceptions.InvalidQueryException;
+import org.apache.lens.driver.es.translator.ASTCriteriaVisitor;
+import org.apache.lens.driver.es.translator.ASTVisitor;
+import org.apache.lens.driver.es.translator.CriteriaVisitorFactory;
+import org.apache.lens.server.api.error.LensException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * This traverses ASTNode in inorder fashion.
+ * More visitors (translation/validation) can be added.
+ * Any SQL query can be converted to ASTNode and can be traversed using this traversal
+ *
+ * Currently this traversal is limited for elastic search. So naming it this way.
+ *
+ * Look at the constructor for usage.
+ */
+@RequiredArgsConstructor
+public final class ASTTraverserForES {
+
+ /**
+ * the root node of the ASTNode
+ */
+ @NonNull
+ private final ASTNode rootQueryNode;
+ /**
+ * The basic query visitor
+ */
+ @NonNull
+ private final ASTVisitor visitor;
+ /**
+ * the criteria visitor factory,
+ * traversal has to create multiple criteria visitor objects for
+ * nested criteria. The impl of factory would determine the type of
+ * criteria visitor
+ */
+ @NonNull
+ private final CriteriaVisitorFactory criteriaVisitorFactory;
+
+ public void accept() throws InvalidQueryException {
+ traverseSelects();
+ traverseTableName();
+ traverseCriteria();
+ traverseGroupBy();
+ traverseOrderBy();
+ traverseLimit();
+ visitor.completeVisit();
+ }
+
+ /**
+ * Visit select expressions
+ */
+ public void traverseSelects() throws InvalidQueryException {
+ final ASTNode selectNode = HQLParser.findNodeByPath(rootQueryNode, HiveParser.TOK_INSERT, HiveParser.TOK_SELECT);
+ if (selectNode == null) {
+ throw new InvalidQueryException("No columns are selected!");
+ }
+ try {
+ for (Node selectExp : selectNode.getChildren()) {
+ final Node innerNode = Helper.getFirstChild(selectExp);
+ final String alias = Helper.getAliasFromSelectExpr(selectExp);
+ if (innerNode.getName().equals(String.valueOf(HiveParser.TOK_FUNCTION))) {
+ Validate.isTrue(innerNode.getChildren().size() == 2);
+ visitor.visitAggregation(
+ Helper.getFirstChild(innerNode).toString(),
+ Helper.getColumnNameFrom(innerNode.getChildren().get(1)),
+ alias
+ );
+ } else if (innerNode.getName().equals(String.valueOf(HiveParser.TOK_ALLCOLREF))) {
+ visitor.visitAllCols();
+ } else if (innerNode.getName().equals(String.valueOf(HiveParser.TOK_TABLE_OR_COL))
+ || innerNode.toString().equals(".")) {
+ visitor.visitSimpleSelect(
+ Helper.getColumnNameFrom(innerNode),
+ alias
+ );
+ } else {
+ throw new InvalidQueryException(selectExp.getName() + " seems to be invalid");
+ }
+ }
+ } catch (Exception e) {
+ throw new InvalidQueryException("Exception while traversing select expressions", e);
+ }
+
+ }
+
+ /**
+ * Visit table name
+ */
+ private void traverseTableName() throws InvalidQueryException {
+ try {
+ final ASTNode dbSchemaTable = HQLParser.findNodeByPath(
+ rootQueryNode,
+ HiveParser.TOK_FROM,
+ HiveParser.TOK_TABREF,
+ HiveParser.TOK_TABNAME);
+ Validate.notNull(dbSchemaTable, "Index and type not present");
+ Validate.isTrue(dbSchemaTable.getChildren().size() == 2, "Index and type not present");
+ final String dbSchema = dbSchemaTable.getChild(0).getText();
+ final String tableName = dbSchemaTable.getChild(1).getText();
+ visitor.visitFrom(dbSchema, tableName);
+ } catch (Exception e) {
+ throw new InvalidQueryException("Error while traversing table name "
+ + "- Expected grammar .. from <index>.<type>", e);
+ }
+ }
+
+ /**
+ * Visit criteria
+ */
+ private void traverseCriteria() throws InvalidQueryException {
+ try {
+ final ASTNode criteriaNode = HQLParser.findNodeByPath(rootQueryNode,
+ HiveParser.TOK_INSERT, HiveParser.TOK_WHERE);
+ if (criteriaNode != null) {
+ visitor.visitCriteria(traverseCriteriaRecursively(Helper.getFirstChild(criteriaNode)));
+ }
+ } catch (Exception e) {
+ throw new InvalidQueryException("Exception while traversing criteria", e);
+ }
+ }
+
+ private ASTCriteriaVisitor traversePredicate(Node whereClause, PredicateInfo predicateInfo)
+ throws InvalidQueryException {
+ final ASTCriteriaVisitor childVisitor = criteriaVisitorFactory.getInstance();
+ final ArrayList<String> rightExpressions = Lists.newArrayList();
+ final List<? extends Node> rightExpList = whereClause.getChildren();
+ String leftCol;
+ switch (predicateInfo.predicateType) {
+ case BETWEEN:
+ Validate.isTrue(rightExpList.size()==5, "Atleast one right expression needed");
+ rightExpressions.add(whereClause.getChildren().get(3).toString());
+ rightExpressions.add(whereClause.getChildren().get(4).toString());
+ leftCol = whereClause.getChildren().get(2).getChildren().get(1).toString();
+ break;
+ case SIMPLE:
+ Validate.isTrue(rightExpList.size()>1, "Atleast one right expression needed");
+ for(Node rightExp : rightExpList.subList(1, rightExpList.size())) {
+ rightExpressions.add(rightExp.toString());
+ }
+ leftCol = Helper.getLeftColFromPredicate(whereClause);
+ break;
+ default:
+ throw new InvalidQueryException("No handlers for predicate " + predicateInfo.predicateType);
+ }
+ childVisitor.visitPredicate(predicateInfo.predicateOp
+ , leftCol
+ , rightExpressions);
+ return childVisitor;
+ }
+
+ private ASTCriteriaVisitor traverseLogicalOperator(Node whereClause, LogicalOpInfo logicalOpInfo)
+ throws InvalidQueryException {
+ final ASTCriteriaVisitor childVisitor = criteriaVisitorFactory.getInstance();
+ final List<ASTCriteriaVisitor> childVisitors = Lists.newArrayList();
+ for (Node node : whereClause.getChildren()) {
+ childVisitors.add(traverseCriteriaRecursively(node));
+ }
+ switch (logicalOpInfo.logicalOpType) {
+ case UNARY:
+ childVisitor.visitUnaryLogicalOp(logicalOpInfo.logicalOperator, childVisitors.get(0));
+ break;
+ case BINARY:
+ childVisitor.visitLogicalOp(logicalOpInfo.logicalOperator, childVisitors);
+ break;
+ }
+ return childVisitor;
+ }
+
+ private ASTCriteriaVisitor traverseCriteriaRecursively(Node whereClause) throws InvalidQueryException {
+ final CriteriaInfo criteriaInfo = Helper.getCriteriaInfo(whereClause);
+ switch (criteriaInfo.criteriaType) {
+ case PREDICATE:
+ return traversePredicate(whereClause, (PredicateInfo) criteriaInfo);
+ case LOGICAL:
+ return traverseLogicalOperator(whereClause, (LogicalOpInfo) criteriaInfo);
+ default:
+ throw new InvalidQueryException("Expecting a predicate or logical operator but got this "
+ + whereClause.toString());
+ }
+ }
+
+ /**
+ * Visit group by
+ */
+ private void traverseGroupBy() throws InvalidQueryException {
+ try {
+ final ASTNode groupByNode = HQLParser.findNodeByPath(rootQueryNode,
+ HiveParser.TOK_INSERT, HiveParser.TOK_GROUPBY);
+ if (groupByNode != null) {
+ for (Node groupBy : groupByNode.getChildren()) {
+ visitor.visitGroupBy(Helper.getColumnNameFrom(groupBy));
+ }
+ }
+ } catch (Exception e) {
+ throw new InvalidQueryException("Exception while parsing group by", e);
+ }
+ }
+
+ /**
+ * Visit order by
+ */
+ private void traverseOrderBy() throws InvalidQueryException {
+ try {
+ final ASTNode orderByNode = HQLParser.findNodeByPath(rootQueryNode,
+ HiveParser.TOK_INSERT, HiveParser.TOK_ORDERBY);
+ if (orderByNode != null) {
+ for (Node orderBy : orderByNode.getChildren()) {
+ visitor.visitOrderBy(
+ Helper.getColumnNameFrom(Helper.getFirstChild(orderBy)),
+ orderBy.getName().equals(String.valueOf(HiveParser.TOK_TABSORTCOLNAMEDESC))
+ ?
+ ASTVisitor.OrderBy.DESC
+ :
+ ASTVisitor.OrderBy.ASC
+ );
+ }
+ }
+ } catch (Exception e) {
+ throw new InvalidQueryException("Exception while parsing order by", e);
+ }
+ }
+
+ /**
+ * Visit limit
+ */
+ private void traverseLimit() throws InvalidQueryException {
+ try {
+ final ASTNode limitNode = HQLParser.findNodeByPath(rootQueryNode,
+ HiveParser.TOK_INSERT, HiveParser.TOK_LIMIT);
+ if (limitNode != null) {
+ visitor.visitLimit(Integer.parseInt(Helper.getFirstChild(limitNode).toString()));
+ }
+ } catch (Exception e) {
+ throw new InvalidQueryException("Error while parsing limit, format should be limit <int>", e);
+ }
+ }
+
+ private enum PredicateType {SIMPLE, BETWEEN};
+ private enum CriteriaType {PREDICATE, LOGICAL}
+ private enum LogicalOpType {UNARY, BINARY}
+ private static class CriteriaInfo {
+ final CriteriaType criteriaType;
+
+ public CriteriaInfo(CriteriaType criteriaType) {
+ this.criteriaType = criteriaType;
+ }
+ }
+ private static class LogicalOpInfo extends CriteriaInfo{
+ final String logicalOperator;
+ final LogicalOpType logicalOpType;
+
+ public LogicalOpInfo(String logicalOperator, LogicalOpType logicalOpType) {
+ super(CriteriaType.LOGICAL);
+ this.logicalOperator = logicalOperator;
+ this.logicalOpType = logicalOpType;
+ }
+ }
+ private static class PredicateInfo extends CriteriaInfo {
+ final PredicateType predicateType;
+ final String predicateOp;
+
+
+ public PredicateInfo(String operator, PredicateType predicateType) {
+ super(CriteriaType.PREDICATE);
+ this.predicateType = predicateType;
+ this.predicateOp = operator;
+ }
+ }
+
+ private static class Helper {
+
+ private static List<String> predicates = Lists.newArrayList("=", ">", "<", "<=", ">=", "between");
+ private static List<String> unaryLogicalOps = Lists.newArrayList("not", "!");
+ private static List<String> binaryLogicalOps = Lists.newArrayList("and", "or", "&", "|", "&&", "||");
+ private static List<String> logicalOps = Lists.newArrayList();
+ static {
+ logicalOps.addAll(unaryLogicalOps);
+ logicalOps.addAll(binaryLogicalOps);
+ }
+
+ private static String getAliasFromSelectExpr(Node selectExp) {
+ return selectExp.getChildren().size() == 2
+ ?
+ selectExp.getChildren().get(1).toString()
+ :
+ null;
+ }
+
+ private static CriteriaInfo getCriteriaInfo(Node whereClause) throws InvalidQueryException {
+ String whereRoot = whereClause.toString();
+ if (Helper.unaryLogicalOps.contains(whereRoot)) {
+ return new LogicalOpInfo(whereRoot, LogicalOpType.UNARY);
+ } else if (Helper.binaryLogicalOps.contains(whereRoot)) {
+ return new LogicalOpInfo(whereRoot, LogicalOpType.BINARY);
+ } else if (Helper.predicates.contains(whereRoot)) {
+ return new PredicateInfo(whereRoot, PredicateType.SIMPLE);
+ } else if (whereRoot.equals("TOK_FUNCTION") && whereClause.getChildren().get(0).toString().equals("between")) {
+ return new PredicateInfo("between", PredicateType.BETWEEN);
+ } else {
+ throw new InvalidQueryException("Could not get criteria info for where clause " + whereRoot);
+ }
+ }
+
+ private static Node getFirstChild(Node node) throws LensException {
+ try {
+ return node.getChildren().get(0);
+ } catch (Exception e) {
+ throw new LensException("Expecting a non empty first child for " + node.toString(), e);
+ }
+ }
+
+ private static String getLeftColFromPredicate(Node predicateNode) throws InvalidQueryException {
+ try {
+ return getColumnNameFrom(getFirstChild(predicateNode));
+ } catch (Exception e) {
+ throw new InvalidQueryException("Only simple predicates of the grammar <col>=<val> is supported as of now", e);
+ }
+ }
+
+ private static String getColumnNameFrom(Node columnNode) {
+ final StringBuilder stringBuilder = new StringBuilder();
+ HQLParser.toInfixString((ASTNode) columnNode, stringBuilder);
+ return stringBuilder.toString().replaceAll("[() ]", "");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java
new file mode 100644
index 0000000..14d9f99
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryPrepareHandle;
+import org.apache.lens.cube.metadata.CubeMetastoreClient;
+import org.apache.lens.cube.parse.HQLParser;
+import org.apache.lens.driver.es.client.ESClient;
+import org.apache.lens.driver.es.client.ESResultSet;
+import org.apache.lens.driver.es.client.jest.JestClientImpl;
+import org.apache.lens.driver.es.translator.ESVisitor;
+import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.driver.*;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.events.LensEventListener;
+import org.apache.lens.server.api.query.AbstractQueryContext;
+import org.apache.lens.server.api.query.PreparedQueryContext;
+import org.apache.lens.server.api.query.QueryContext;
+import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy;
+import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
+import org.apache.lens.server.api.query.cost.FactPartitionBasedQueryCost;
+import org.apache.lens.server.api.query.cost.QueryCost;
+
+import org.apache.commons.lang.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.antlr.runtime.CommonToken;
+import org.antlr.runtime.tree.Tree;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Driver for elastic search
+ */
+@Slf4j
+public class ESDriver implements LensDriver {
+
+ private static final AtomicInteger THID = new AtomicInteger();
+ private static final double STREAMING_PARTITION_COST = 0;
+ private static final QueryCost ES_DRIVER_COST = new FactPartitionBasedQueryCost(STREAMING_PARTITION_COST);
+
+ private Configuration conf;
+ private ESClient esClient;
+ private ExecutorService asyncQueryPool;
+ private ESDriverConfig config;
+
+ /**
+ * States
+ */
+ private final Map<String, ESQuery> rewrittenQueriesCache = Maps.newConcurrentMap();
+ private final Map<QueryHandle, Future<LensResultSet>> resultSetMap = Maps.newConcurrentMap();
+ private final Map<QueryHandle, QueryCompletionListener> handleListenerMap = Maps.newConcurrentMap();
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public QueryCost estimate(AbstractQueryContext qctx) {
+ return ES_DRIVER_COST;
+ }
+
+ @Override
+ public DriverQueryPlan explain(final AbstractQueryContext context) throws LensException {
+ final ESQuery esQuery = rewrite(context);
+ final String jsonExplanation = esClient.explain(esQuery);
+ if (jsonExplanation == null) {
+ throw new LensException("Explanation failed, empty json was returned");
+ }
+ return new DriverQueryPlan() {
+ @Override
+ public String getPlan() {
+ return jsonExplanation;
+ }
+
+ @Override
+ public QueryCost getCost() {
+ return ES_DRIVER_COST;
+ }
+ };
+ }
+
+ @Override
+ public void prepare(PreparedQueryContext pContext) throws LensException {
+ rewrite(pContext);
+ }
+
+ @Override
+ public DriverQueryPlan explainAndPrepare(PreparedQueryContext pContext) throws LensException {
+ prepare(pContext);
+ return explain(pContext);
+ }
+
+ @Override
+ public void closePreparedQuery(QueryPrepareHandle handle) {
+ /**
+ * Elastic search does not have a concept of prepared query.
+ */
+ }
+
+ @Override
+ public LensResultSet execute(QueryContext context) throws LensException {
+ final ESQuery esQuery = rewrite(context);
+ final QueryHandle queryHandle = context.getQueryHandle();
+ final ESResultSet resultSet = esClient.execute(esQuery);
+ notifyComplIfRegistered(queryHandle);
+ return resultSet;
+ }
+
+ @Override
+ public void executeAsync(final QueryContext context) {
+ final Future<LensResultSet> futureResult
+ = asyncQueryPool.submit(new ESQueryExecuteCallable(context, SessionState.get()));
+ resultSetMap.put(context.getQueryHandle(), futureResult);
+ }
+
+ @Override
+ public void registerForCompletionNotification(QueryHandle handle, long timeoutMillis,
+ QueryCompletionListener listener) {
+ handleListenerMap.put(handle, listener);
+ }
+
+ @Override
+ public void updateStatus(QueryContext context) {
+ final QueryHandle queryHandle = context.getQueryHandle();
+ final Future<LensResultSet> lensResultSetFuture = resultSetMap.get(queryHandle);
+ if (lensResultSetFuture == null) {
+ context.getDriverStatus().setState(DriverQueryStatus.DriverQueryState.CLOSED);
+ context.getDriverStatus().setStatusMessage(queryHandle + " closed");
+ context.getDriverStatus().setResultSetAvailable(false);
+ } else if (lensResultSetFuture.isDone()) {
+ context.getDriverStatus().setState(DriverQueryStatus.DriverQueryState.SUCCESSFUL);
+ context.getDriverStatus().setStatusMessage(queryHandle + " successful");
+ context.getDriverStatus().setResultSetAvailable(true);
+ } else if (lensResultSetFuture.isCancelled()) {
+ context.getDriverStatus().setState(DriverQueryStatus.DriverQueryState.CANCELED);
+ context.getDriverStatus().setStatusMessage(queryHandle + " cancelled");
+ context.getDriverStatus().setResultSetAvailable(false);
+ }
+ }
+
+ @Override
+ public LensResultSet fetchResultSet(QueryContext context) throws LensException {
+ try {
+ /**
+ * removing the result set as soon as the fetch is done
+ */
+ return resultSetMap.remove(context.getQueryHandle()).get();
+ } catch (NullPointerException e) {
+ throw new LensException("The results for the query "
+ + context.getQueryHandleString()
+ + "has already been fetched");
+ } catch (InterruptedException | ExecutionException e) {
+ throw new LensException("Error fetching result set!", e);
+ }
+ }
+
+ @Override
+ public void closeResultSet(QueryHandle handle) throws LensException {
+ try {
+ resultSetMap.remove(handle);
+ } catch (NullPointerException e) {
+ throw new LensException("The query does not exist or was already purged", e);
+ }
+ }
+
+ @Override
+ public boolean cancelQuery(QueryHandle handle) throws LensException {
+ try {
+ boolean cancelled = resultSetMap.get(handle).cancel(true);
+ if (cancelled) {
+ notifyQueryCancellation(handle);
+ }
+ return cancelled;
+ } catch (NullPointerException e) {
+ throw new LensException("The query does not exist or was already purged", e);
+ }
+ }
+
+ @Override
+ public void closeQuery(QueryHandle handle) throws LensException {
+ cancelQuery(handle);
+ closeResultSet(handle);
+ handleListenerMap.remove(handle);
+ }
+
+ @Override
+ public void close() throws LensException {
+ for(QueryHandle handle : resultSetMap.keySet()) {
+ try {
+ closeQuery(handle);
+ } catch (LensException e) {
+ log.error("Error while closing query {}", handle.getHandleIdString(), e);
+ }
+ }
+ }
+
+ @Override
+ public void registerDriverEventListener(LensEventListener<DriverEvent> driverEventListener) {
+
+ }
+
+ @Override
+ public ImmutableSet<QueryLaunchingConstraint> getQueryConstraints() {
+ return ImmutableSet.copyOf(Sets.<QueryLaunchingConstraint>newHashSet());
+ }
+
+ @Override
+ public ImmutableSet<WaitingQueriesSelectionPolicy> getWaitingQuerySelectionPolicies() {
+ return ImmutableSet.copyOf(Sets.<WaitingQueriesSelectionPolicy>newHashSet());
+ }
+
+ private void notifyComplIfRegistered(QueryHandle queryHandle) {
+ try {
+ handleListenerMap.get(queryHandle).onCompletion(queryHandle);
+ } catch (NullPointerException e) {
+ log.debug("There are no subscriptions for notification. Skipping for {}", queryHandle.getHandleIdString(), e);
+ }
+ }
+
+ private void notifyQueryCancellation(QueryHandle handle) {
+ try {
+ handleListenerMap.get(handle).onError(handle, handle + " cancelled");
+ } catch (NullPointerException e) {
+ log.debug("There are no subscriptions for notification. Skipping for {}", handle.getHandleIdString(), e);
+ }
+ }
+
+ private ESQuery rewrite(AbstractQueryContext context) throws LensException {
+ final String key = keyFor(context);
+ if (rewrittenQueriesCache.containsKey(key)) {
+ return rewrittenQueriesCache.get(key);
+ } else {
+ final ASTNode rootQueryNode = HQLParser.parseHQL(context.getDriverQuery(this), new HiveConf());
+ setIndexAndTypeIfNotPresent(context, rootQueryNode);
+ final ESQuery esQuery = ESVisitor.rewrite(config, rootQueryNode);
+ rewrittenQueriesCache.put(key, esQuery);
+ return esQuery;
+ }
+ }
+
+ private void setIndexAndTypeIfNotPresent(AbstractQueryContext context, ASTNode rootQueryNode) throws LensException {
+ final ASTNode dbSchemaTable = HQLParser.findNodeByPath(
+ rootQueryNode,
+ HiveParser.TOK_FROM,
+ HiveParser.TOK_TABREF,
+ HiveParser.TOK_TABNAME);
+ try {
+ Validate.notNull(dbSchemaTable);
+ if (dbSchemaTable.getChildren().size() == 2) {
+ /**
+ * Index and type is already set here
+ */
+ return;
+ }
+ /**
+ * Get the table name, check metastore and set index and actual table name
+ */
+ final Tree firstChild = dbSchemaTable.getChild(0);
+ final String lensTable = firstChild.getText();
+ final Table tbl = CubeMetastoreClient.getInstance(context.getHiveConf()).getHiveTable(lensTable);
+ final String index = tbl.getProperty(LensConfConstants.ES_INDEX_NAME);
+ final String type = tbl.getProperty(LensConfConstants.ES_TYPE_NAME);
+ Validate.notNull(index, LensConfConstants.ES_INDEX_NAME + " property missing in table definition");
+ Validate.notNull(type, LensConfConstants.ES_TYPE_NAME + " property missing in table definition");
+ ((ASTNode) firstChild).getToken().setText(type);
+ final ASTNode indexIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, index));
+ indexIdentifier.setParent(dbSchemaTable);
+ dbSchemaTable.insertChild(0, indexIdentifier);
+ } catch (HiveException e) {
+ throw new LensException("Error occured when trying to communicate with metastore");
+ }
+ }
+
+ private String keyFor(AbstractQueryContext context) {
+ return String.valueOf(context.getFinalDriverQuery(this)!=null) + ":" + context.getDriverQuery(this);
+ }
+
+ ESClient getESClient() {
+ return esClient;
+ }
+
+ @Override
+ public void configure(Configuration conf) throws LensException {
+ this.conf = new Configuration(conf);
+ this.conf.addResource("esdriver-default.xml");
+ this.conf.addResource("esdriver-site.xml");
+ config = new ESDriverConfig(this.conf);
+ Class klass;
+ try {
+ klass = Class.forName(this.conf.get(ESDriverConfig.CLIENT_CLASS_KEY));
+ if (klass != null) {
+ log.debug("Picked up class {}", klass);
+ if (ESClient.class.isAssignableFrom(klass)) {
+ final Constructor constructor = klass.getConstructor(ESDriverConfig.class, Configuration.class);
+ esClient = (ESClient) constructor.newInstance(config, this.conf);
+ log.debug("Successfully instantiated es client of type {}", klass);
+ }
+ } else {
+ log.debug("Client class not provided, falling back to the default Jest client");
+ esClient = new JestClientImpl(config, conf);
+ }
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ log.error("ES driver cannot start!", e);
+ throw new LensException("Cannot start es driver", e);
+ }
+ log.debug("ES Driver configured");
+ asyncQueryPool = Executors.newCachedThreadPool(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread th = new Thread(runnable);
+ th.setName("lens-driver-es-" + THID.incrementAndGet());
+ return th;
+ }
+ });
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ /**
+ * This flow could be abstracted out at the driver level
+ */
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ /**
+ * This flow could be abstracted out at the driver level
+ */
+ }
+
+ protected class ESQueryExecuteCallable implements Callable<LensResultSet> {
+
+ private final QueryContext queryContext;
+ private final SessionState sessionState;
+
+ public ESQueryExecuteCallable(QueryContext queryContext, SessionState sessionState) {
+ this.queryContext = queryContext;
+ this.sessionState = sessionState;
+ }
+
+ @Override
+ public LensResultSet call() throws Exception {
+ SessionState.setCurrentSessionState(sessionState);
+ return execute(queryContext);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java
new file mode 100644
index 0000000..df01fe6
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import org.apache.lens.driver.es.translator.ASTVisitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableMap;
+import lombok.Getter;
+
+/**
+ * The constants used by ESDriver and es re-writers
+ */
+public final class ESDriverConfig {
+ public static final String CLIENT_CLASS_KEY = "lens.driver.es.client.class";
+ public static final String MAX_ROW_SIZE_KEY = "lens.driver.es.max.row.size";
+ public static final String TERM_FETCH_SIZE_KEY = "lens.driver.es.term.fetch.size";
+ public static final String AGGR_BUCKET_SIZE_LENS_KEY = "lens.driver.es.aggr.bucket.size";
+ public static final String QUERY_TIME_OUT_LENS_KEY = "lens.driver.es.query.timeout.millis";
+
+ public static final String AGGS = "aggs";
+ public static final String MATCH_ALL = "match_all";
+ public static final String TERMS = "terms";
+ public static final String TERM = "term";
+ public static final String FIELD = "field";
+ public static final String FILTER = "filter";
+ public static final String FROM = "from";
+ public static final String RANGE = "range";
+ public static final String FILTER_WRAPPER = "filter_wrapper";
+ public static final String FIELDS = "fields";
+ public static final String TERM_SORT = "sort";
+ public static final String SIZE = "size";
+ public static final String QUERY_TIME_OUT_STRING = "timeout";
+
+ public static final ImmutableMap<String, String> LOGICAL_OPS;
+ public static final ImmutableMap<String, String> RANGE_PREDICATES;
+ public static final ImmutableMap<String, String> PREDICATES;
+ public static final ImmutableMap<String, String> AGGREGATIONS;
+ public static final ImmutableMap<ASTVisitor.OrderBy, String> ORDER_BYS;
+ public static final int AGGR_TERM_FETCH_SIZE = 0;
+ public static final int DEFAULT_TERM_QUERY_OFFSET = 0;
+ private static final int DEFAULT_TERM_QUERY_LIMIT = -1;
+ private static final int AGGR_BUCKET_SIZE_DEFAULT = 10000;
+ private static final int QUERY_TIME_OUT_MS_DEFAULT = 10000;
+
+ private static final int TERM_FETCH_SIZE_DEFAULT = 5000;
+
+ static {
+ final ImmutableMap.Builder<String, String> logicalOpsBuilder = ImmutableMap.builder();
+ logicalOpsBuilder.put("and", "and");
+ logicalOpsBuilder.put("&&", "and");
+ logicalOpsBuilder.put("&", "and");
+ logicalOpsBuilder.put("or", "or");
+ logicalOpsBuilder.put("||", "or");
+ logicalOpsBuilder.put("|", "or");
+ logicalOpsBuilder.put("!", "not");
+ LOGICAL_OPS = logicalOpsBuilder.build();
+
+ final ImmutableMap.Builder<String, String> predicatesBuilder = ImmutableMap.builder();
+ predicatesBuilder.put(">", "gt");
+ predicatesBuilder.put(">=", "gte");
+ predicatesBuilder.put("<", "lt");
+ predicatesBuilder.put("<=", "lte");
+ predicatesBuilder.put("between", "range");
+ RANGE_PREDICATES = predicatesBuilder.build();
+ predicatesBuilder.put("=", "term");
+ predicatesBuilder.put("in", "terms");
+ PREDICATES = predicatesBuilder.build();
+
+ final ImmutableMap.Builder<String, String> aggregationsBuilder = ImmutableMap.builder();
+ aggregationsBuilder.put("count", "value_count");
+ aggregationsBuilder.put("count_distinct", "cardinality");
+ aggregationsBuilder.put("max", "max");
+ aggregationsBuilder.put("sum", "sum");
+ aggregationsBuilder.put("min", "min");
+ aggregationsBuilder.put("avg", "avg");
+ aggregationsBuilder.put("percentile", "percentiles");
+ AGGREGATIONS = aggregationsBuilder.build();
+
+ final ImmutableMap.Builder<ASTVisitor.OrderBy, String> orderByBuilder = ImmutableMap.builder();
+ orderByBuilder.put(ASTVisitor.OrderBy.ASC, "asc");
+ orderByBuilder.put(ASTVisitor.OrderBy.DESC, "desc");
+ ORDER_BYS = orderByBuilder.build();
+ }
+
+ @Getter
+ private final int maxLimit;
+ @Getter
+ private final int aggrBucketSize;
+ @Getter
+ private final int queryTimeOutMs;
+ private final int termFetchSize;
+
+ public int getTermFetchSize() {
+ return termFetchSize;
+ }
+
+ public ESDriverConfig(Configuration conf) {
+ maxLimit = conf.getInt(MAX_ROW_SIZE_KEY, DEFAULT_TERM_QUERY_LIMIT);
+ aggrBucketSize = conf.getInt(AGGR_BUCKET_SIZE_LENS_KEY, AGGR_BUCKET_SIZE_DEFAULT);
+ queryTimeOutMs = conf.getInt(QUERY_TIME_OUT_LENS_KEY, QUERY_TIME_OUT_MS_DEFAULT);
+ termFetchSize = conf.getInt(TERM_FETCH_SIZE_KEY, TERM_FETCH_SIZE_DEFAULT);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java
new file mode 100644
index 0000000..32e3156
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import com.google.common.collect.ImmutableList;
+import lombok.Data;
+
+@Data
+public final class ESQuery {
+ public enum QueryType {AGGR, TERM}
+ /**
+ * index on which the query will get executed
+ */
+ private final String index;
+ /**
+ * type on which the query will get executed
+ */
+ private final String type;
+ /**
+ * the query
+ */
+ private final String query;
+ /**
+ * aliases in the query (ordered)
+ */
+ private final ImmutableList<String> schema;
+ /**
+ * Columns/expressions in the query (ordered)
+ */
+ private final ImmutableList<String> columns;
+ /**
+ * Type of the query, aggregate or term;
+ */
+ private final QueryType queryType;
+ /**
+ * size of the query;
+ */
+ private final int limit;
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java
new file mode 100644
index 0000000..5363a94
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.lens.api.query.ResultRow;
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.exceptions.ESClientException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.NonNull;
+
+public abstract class ESClient {
+
+ @NonNull
+ protected final ESDriverConfig esDriverConfig;
+
+ private ExecutionMode getExecutionModeFor(ESQuery esQuery) {
+ return esQuery.getQueryType().equals(ESQuery.QueryType.AGGR)
+ ?
+ new DefaultExecutionMode(esQuery)
+ :
+ new ScrollingExecutionMode(esQuery);
+ }
+
+ protected abstract ESResultSet executeImpl(ESQuery esQuery) throws ESClientException;
+
+ private abstract static class ExecutionMode {
+
+ @NonNull
+ protected final ESQuery esQuery;
+
+ ExecutionMode(ESQuery query) {
+ this.esQuery = query;
+ }
+
+ abstract ESResultSet executeInternal() throws ESClientException;
+
+ }
+
+ private class ScrollingExecutionMode extends ExecutionMode {
+
+ @NonNull
+ final JsonObject jsonQuery;
+
+ ScrollingExecutionMode(ESQuery query) {
+ super(query);
+ jsonQuery = (JsonObject) new JsonParser().parse(query.getQuery());
+ }
+
+ ESQuery modify(int offset, int limit) {
+ jsonQuery.addProperty(ESDriverConfig.FROM, offset);
+ jsonQuery.addProperty(ESDriverConfig.SIZE, limit);
+ return new ESQuery(
+ esQuery.getIndex(),
+ esQuery.getType(),
+ jsonQuery.toString(),
+ ImmutableList.copyOf(esQuery.getSchema()),
+ ImmutableList.copyOf(esQuery.getColumns()),
+ esQuery.getQueryType(),
+ esQuery.getLimit()
+ );
+ }
+
+
+ @Override
+ ESResultSet executeInternal() throws ESClientException{
+ final ESResultSet resultSet = executeImpl(esQuery);
+ final int fetchSize = esDriverConfig.getTermFetchSize();
+ final int limit = esQuery.getLimit();
+ return new ESResultSet(
+ limit,
+ new Iterable<ResultRow>() {
+ ESResultSet batch = resultSet;
+ int processed = 0;
+
+ final ESResultSet getNextBatch() throws ESClientException {
+ final int toProcess = limit - processed;
+ final int newFetchSize = limit == -1 || fetchSize < toProcess
+ ?
+ fetchSize
+ :
+ toProcess;
+ batch = executeImpl(modify(processed, newFetchSize));
+ return batch;
+ }
+
+
+ @Override
+ public Iterator<ResultRow> iterator() {
+ return new Iterator<ResultRow>() {
+ @Override
+ public boolean hasNext() {
+ try {
+ return processed < limit
+ && (batch.hasNext() || getNextBatch().size > 0);
+ } catch (ESClientException e) {
+ throw new RuntimeException("Encountered a runtime issue during execution", e);
+ }
+ }
+
+ @Override
+ public ResultRow next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("Processed : " + processed + ", Limit : " + limit);
+ }
+ final ResultRow nextRow = batch.next();
+ processed++;
+ return nextRow;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove from es resultset!");
+ }
+ };
+ }
+ },
+ resultSet.getMetadata()
+ );
+ }
+ }
+
+ private class DefaultExecutionMode extends ExecutionMode {
+
+ DefaultExecutionMode(ESQuery query) {
+ super(query);
+ }
+
+ @Override
+ ESResultSet executeInternal() throws ESClientException {
+ return executeImpl(esQuery);
+ }
+
+ }
+
+ public ESClient(ESDriverConfig esDriverConfig, Configuration conf) {
+ this.esDriverConfig = esDriverConfig;
+ }
+
+ public final ESResultSet execute(final ESQuery esQuery) throws ESClientException {
+ return getExecutionModeFor(esQuery).executeInternal();
+ }
+
+ public abstract String explain(ESQuery esQuery) throws ESClientException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
new file mode 100644
index 0000000..4ba2321
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client;
+
+import java.util.Iterator;
+
+import org.apache.lens.api.query.ResultRow;
+import org.apache.lens.server.api.driver.InMemoryResultSet;
+import org.apache.lens.server.api.driver.LensResultSetMetadata;
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.NonNull;
+
+/**
+ * The class ESResultset for iterating over elastic search result set
+ */
+public class ESResultSet extends InMemoryResultSet {
+
+ @NonNull
+ final Iterator<ResultRow> resultSetIterator;
+ @NonNull
+ final LensResultSetMetadata resultSetMetadata;
+ final int size;
+
+ public ESResultSet(int size, final Iterable<ResultRow> resultSetIterable, final LensResultSetMetadata metadata) {
+ this.size = size;
+ this.resultSetIterator = resultSetIterable.iterator();
+ this.resultSetMetadata = metadata;
+ }
+
+ @Override
+ public boolean hasNext(){
+ return resultSetIterator.hasNext();
+ }
+
+ @Override
+ public ResultRow next() {
+ return resultSetIterator.next();
+ }
+
+ @Override
+ public void setFetchSize(int i) {
+
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public LensResultSetMetadata getMetadata() {
+ return resultSetMetadata;
+ }
+
+ @Override
+ public boolean seekToStart() throws LensException {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java
new file mode 100644
index 0000000..35dc070
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client.jest;
+
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.client.ESClient;
+import org.apache.lens.driver.es.client.ESResultSet;
+import org.apache.lens.driver.es.exceptions.ESClientException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestClientFactory;
+import io.searchbox.client.config.HttpClientConfig;
+import io.searchbox.core.Explain;
+import io.searchbox.core.Search;
+import io.searchbox.core.SearchResult;
+import lombok.NonNull;
+
+/**
+ * The ESRestClient for firing queries on elastic search
+ */
+public class JestClientImpl extends ESClient {
+
+ private static final int DEFAULT_MAX_CONN = 10;
+ private static final boolean DEFAULT_MULTI_THREADED = true;
+ private static final String IS_MULTITHREADED = "lens.driver.es.jest.is.multi.threaded";
+ private static final String MAX_TOTAL_CONN = "lens.driver.es.jest.max.conn";
+ private static final String ES_SERVERS = "lens.driver.es.jest.servers";
+
+ @NonNull
+ private final JestClient client;
+
+ public JestClientImpl(ESDriverConfig esDriverConfig, Configuration conf) {
+ super(esDriverConfig, conf);
+ final JestClientFactory factory = new JestClientFactory();
+ factory.setHttpClientConfig(new HttpClientConfig
+ .Builder(Validate.notNull(conf.getStringCollection(ES_SERVERS)))
+ .maxTotalConnection(conf.getInt(MAX_TOTAL_CONN, DEFAULT_MAX_CONN))
+ .multiThreaded(conf.getBoolean(IS_MULTITHREADED, DEFAULT_MULTI_THREADED))
+ .readTimeout(esDriverConfig.getQueryTimeOutMs())
+ .build());
+ client = factory.getObject();
+ }
+
+ @Override
+ public ESResultSet executeImpl(ESQuery esQuery) throws ESClientException {
+ try {
+ final Search search = new Search.Builder(esQuery.getQuery())
+ .addIndex(esQuery.getIndex())
+ .addType(esQuery.getType())
+ .build();
+ final SearchResult result = client.execute(search);
+ if (result == null) {
+ throw new NullPointerException("Got null result from client for " + esQuery);
+ }
+ return JestResultSetTransformer.transformFrom(
+ result.getJsonObject(), esQuery.getSchema(), esQuery.getColumns());
+ } catch (Exception e) {
+ throw new ESClientException("Execution failed, ", e);
+ }
+ }
+
+ public String explain(ESQuery esQuery) throws ESClientException {
+ try {
+ return client
+ .execute(new Explain
+ .Builder(esQuery.getIndex(), esQuery.getType(), null, esQuery.getQuery())
+ .build())
+ .getJsonString();
+ } catch (Exception e) {
+ throw new ESClientException("Explanation failed, ", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java
new file mode 100644
index 0000000..af313da
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client.jest;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lens.api.query.ResultRow;
+import org.apache.lens.driver.es.client.ESResultSet;
+import org.apache.lens.server.api.driver.LensResultSetMetadata;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hive.service.cli.ColumnDescriptor;
+import org.apache.hive.service.cli.Type;
+import org.apache.hive.service.cli.TypeDescriptor;
+
+import com.google.common.collect.Lists;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import lombok.NonNull;
+
+/**
+ * The class ESResultSetTransformer, transforms json responses of elastic search
+ * to 2D result sets that lens expects
+ */
+public abstract class JestResultSetTransformer {
+
+ /**
+ * The class AggregateTransformer, takes care of transforming results of
+ * aggregate queries
+ */
+ static class AggregateTransformer extends JestResultSetTransformer {
+
+ private List<ResultRow> rows = Lists.newArrayList();
+
+ public AggregateTransformer(JsonObject result, List<String> schema, List<String> selectedColumns) {
+ super(result, schema, selectedColumns);
+ }
+
+ /**
+ * The json response from jest will be nested in case of group bys
+ * g1-val (group 1)
+ * | \
+ * g2-val1 g2-val2 (group 2)
+ * | |
+ * 5, 10 10, 15 (metrics)
+ *
+ * The method will traverse the tree and collect all the paths ->
+ * keyCol1 keyCol2 aggr1 aggr2
+ * g1-val g2-val1 5 10
+ * g1-val g2-val2 10 15
+ *
+ * @param object, json response of jest
+ * @param currentPath, current row
+ * @param length, length of the current row
+ * @param keyCol, Col name of the keys that will be parsed in the recursion
+ */
+ private void collectAllRows(JsonObject object, List<Object> currentPath, int length, String keyCol) {
+ for (Map.Entry<String, JsonElement> entry : object.entrySet()) {
+ JsonElement element = entry.getValue();
+ final String key = entry.getKey();
+ if (key.equals(ResultSetConstants.KEY_STRING)) {
+ Validate.isTrue(keyCol != null, "Key not available");
+ currentPath.set(schema.indexOf(keyCol), entry.getValue().getAsString());
+ length++;
+ if (length == schema.size()) {
+ rows.add(new ResultRow(Lists.newArrayList(currentPath)));
+ }
+ } else if (element instanceof JsonObject && ((JsonObject) element).get(ResultSetConstants.VALUE_KEY) != null) {
+ currentPath.set(schema.indexOf(key),
+ ((JsonObject) element).get(ResultSetConstants.VALUE_KEY).getAsString());
+ length++;
+ if (length == schema.size()) {
+ rows.add(new ResultRow(Lists.newArrayList(currentPath)));
+ }
+ } else if (element instanceof JsonObject) {
+ collectAllRows((JsonObject) element, Lists.newArrayList(currentPath), length, key);
+ } else if (element instanceof JsonArray && key.equals(ResultSetConstants.BUCKETS_KEY)) {
+ JsonArray array = (JsonArray) element;
+ for (JsonElement arrayElement : array) {
+ collectAllRows((JsonObject) arrayElement, Lists.newArrayList(currentPath), length, keyCol);
+ }
+ }
+ }
+ }
+
+ @Override
+ public ESResultSet transform() {
+ collectAllRows(
+ result.getAsJsonObject(ResultSetConstants.AGGREGATIONS_KEY)
+ .getAsJsonObject(ResultSetConstants.FILTER_WRAPPER_KEY),
+ getEmptyRow(),
+ 0,
+ null
+ );
+ return new ESResultSet(
+ rows.size(),
+ rows,
+ getMetaData(schema)
+ );
+ }
+ }
+
+ /**
+ * The class TermTransformer, takes care of transforming results of simple select queries
+ */
+ static class TermTransformer extends JestResultSetTransformer {
+
+ public TermTransformer(JsonObject result, List<String> schema, List<String> selectedColumns) {
+ super(result, schema, selectedColumns);
+ }
+
+ @Override
+ public ESResultSet transform() {
+
+ JsonArray jsonArray = result
+ .getAsJsonObject(ResultSetConstants.HITS_KEY)
+ .getAsJsonArray(ResultSetConstants.HITS_KEY);
+
+ final List<ResultRow> rows = Lists.newArrayList();
+ for (JsonElement element : jsonArray) {
+ final List<Object> objects = getEmptyRow();
+ for (Map.Entry<String, JsonElement> entry
+ : element
+ .getAsJsonObject()
+ .getAsJsonObject(ResultSetConstants.FIELDS_KEY)
+ .entrySet()) {
+ objects.set(
+ selectedColumns.indexOf(entry.getKey())
+ , entry.getValue().getAsString()
+ );
+ }
+ rows.add(new ResultRow(objects));
+ }
+ return new ESResultSet(rows.size(), rows, getMetaData(schema));
+ }
+
+
+ }
+
+ @NonNull
+ protected final JsonObject result;
+ @NonNull
+ protected final List<String> schema;
+ @NonNull
+ protected final List<String> selectedColumns;
+
+ public JestResultSetTransformer(JsonObject result, List<String> schema, List<String> selectedColumns) {
+ this.schema = schema;
+ this.result = result;
+ this.selectedColumns = selectedColumns;
+ }
+
+ public static ESResultSet transformFrom(JsonObject jsonResult, List<String> schema, List<String> selectedColumns) {
+ if (jsonResult.getAsJsonObject(ResultSetConstants.AGGREGATIONS_KEY) != null) {
+ return new AggregateTransformer(jsonResult, schema, selectedColumns).transform();
+ } else {
+ return new TermTransformer(jsonResult, schema, selectedColumns).transform();
+ }
+ }
+
+ protected List<Object> getEmptyRow() {
+ List<Object> objects = Lists.newArrayList();
+ int i = 0;
+ while (i++ < schema.size()) {
+ objects.add(null);
+ }
+ return objects;
+ }
+
+ public abstract ESResultSet transform();
+
+ protected LensResultSetMetadata getMetaData(final List<String> schema) {
+ return new LensResultSetMetadata() {
+ @Override
+ public List<ColumnDescriptor> getColumns() {
+ List<ColumnDescriptor> descriptors = Lists.newArrayList();
+ int i = 0;
+ for (final String col : schema) {
+ descriptors.add(
+ new ColumnDescriptor(col, col, new TypeDescriptor(Type.STRING_TYPE), i++)
+ );
+ }
+ return descriptors;
+ }
+ };
+ }
+
+ protected static class ResultSetConstants {
+ public static final String AGGREGATIONS_KEY = "aggregations";
+ public static final String KEY_STRING = "key";
+ public static final String VALUE_KEY = "value";
+ public static final String BUCKETS_KEY = "buckets";
+ public static final String FILTER_WRAPPER_KEY = "filter_wrapper";
+ public static final String HITS_KEY = "hits";
+ public static final String FIELDS_KEY = "fields";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java
new file mode 100644
index 0000000..c2254f1
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.exceptions;
+
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.NonNull;
+
+public class ESClientException extends LensException {
+ public ESClientException(String s, Exception e) {
+ super(s, e);
+ }
+
+ public ESClientException(String errorMsg) {
+ super(errorMsg);
+ }
+
+ public ESClientException(String errorMsg, Throwable cause) {
+ super(errorMsg, cause);
+ }
+
+ public ESClientException() {
+ }
+
+ public ESClientException(Throwable cause) {
+ super(cause);
+ }
+
+ public ESClientException(int errorCode) {
+ super(errorCode);
+ }
+
+ public ESClientException(String errorMsg, int errorCode) {
+ super(errorMsg, errorCode);
+ }
+
+ public ESClientException(int errorCode, Throwable cause,
+ @NonNull Object... errorMsgFormattingArgs) {
+ super(errorCode, cause, errorMsgFormattingArgs);
+ }
+
+ public ESClientException(String errorMsg, int errorcode, Throwable cause,
+ @NonNull Object... errorMsgFormattingArgs) {
+ super(errorMsg, errorcode, cause, errorMsgFormattingArgs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java
new file mode 100644
index 0000000..8127cba
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.exceptions;
+
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.NonNull;
+
+public class InvalidQueryException extends LensException {
+ static final String BASE_EXCEPTION_MESSAGE = "AST traversal failed!";
+
+ public InvalidQueryException(String s) {
+ super(BASE_EXCEPTION_MESSAGE + s);
+ }
+
+ public InvalidQueryException(String errorMsg, Throwable cause) {
+ super(errorMsg, cause);
+ }
+
+ public InvalidQueryException() {
+ }
+
+ public InvalidQueryException(Throwable cause) {
+ super(cause);
+ }
+
+ public InvalidQueryException(int errorCode) {
+ super(errorCode);
+ }
+
+ public InvalidQueryException(String errorMsg, int errorCode) {
+ super(errorMsg, errorCode);
+ }
+
+ public InvalidQueryException(int errorCode, Throwable cause,
+ @NonNull Object... errorMsgFormattingArgs) {
+ super(errorCode, cause, errorMsgFormattingArgs);
+ }
+
+ public InvalidQueryException(String errorMsg, int errorcode, Throwable cause,
+ @NonNull Object... errorMsgFormattingArgs) {
+ super(errorMsg, errorcode, cause, errorMsgFormattingArgs);
+ }
+
+ public InvalidQueryException(String s, Exception e) {
+ super(BASE_EXCEPTION_MESSAGE + s, e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java
new file mode 100644
index 0000000..a0bd7c8
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+import java.util.List;
+
+import org.apache.lens.driver.es.exceptions.InvalidQueryException;
+
+public interface ASTCriteriaVisitor {
+ /**
+ *
+ * @param logicalOp The unary operator
+ * @param visitedSubTree The visited subtree of unary operator
+ */
+ void visitUnaryLogicalOp(String logicalOp, ASTCriteriaVisitor visitedSubTree) throws InvalidQueryException;
+
+ /**
+ *
+ * @param logicalOp The logical operator
+ * @param visitedSubTrees The visited subtree of logical operator
+ */
+ void visitLogicalOp(String logicalOp, List<ASTCriteriaVisitor> visitedSubTrees) throws InvalidQueryException;
+
+ /**
+ * Visits simple predicate
+ *
+ * @param conditionalOp Operator constructing the predicate
+ * @param leftCol Left column
+ * @param rightExps Right expression
+ */
+ void visitPredicate(String conditionalOp, String leftCol, List<String> rightExps) throws InvalidQueryException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java
new file mode 100644
index 0000000..f654a60
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+/**
+ * The visitor interface for ASTInorderTraversal
+ * It covers only simple queries (no joins or sub queries)
+ * Also it does not handle complicated expressions
+ */
+public interface ASTVisitor {
+
+ enum OrderBy {ASC, DESC}
+
+ void visitSimpleSelect(String columnName, String alias);
+
+ void visitAggregation(String aggregationType, String columnName, String alias);
+
+ void visitFrom(String database, String table);
+
+ void visitCriteria(ASTCriteriaVisitor visitedSubTree);
+
+ void visitGroupBy(String colName);
+
+ void visitOrderBy(String colName, OrderBy orderBy);
+
+ void visitLimit(int limit);
+
+ void completeVisit();
+
+ void visitAllCols();
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java
new file mode 100644
index 0000000..92ec10f
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+/**
+ * The interface CriteriaVisitorFactory that produces instances of type ASTCriteriaVisitor
+ */
+public interface CriteriaVisitorFactory {
+ ASTCriteriaVisitor getInstance();
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java
new file mode 100644
index 0000000..441f6d6
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+import static org.apache.lens.driver.es.ESDriverConfig.MATCH_ALL;
+
+import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
+
+import java.util.List;
+
+import org.apache.lens.cube.parse.HQLParser;
+import org.apache.lens.driver.es.ASTTraverserForES;
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.translator.impl.ESAggregateVisitor;
+import org.apache.lens.driver.es.translator.impl.ESCriteriaVisitor;
+import org.apache.lens.driver.es.translator.impl.ESCriteriaVisitorFactory;
+import org.apache.lens.driver.es.translator.impl.ESTermVisitor;
+import org.apache.lens.server.api.error.LensException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+
+/**
+ * The abstract visitor for constructing elastic search queries from ASTNode
+ */
+public abstract class ESVisitor implements ASTVisitor {
+ protected static final JsonNodeFactory JSON_NODE_FACTORY = JsonNodeFactory.instance;
+
+ protected final ObjectNode queryNode = JSON_NODE_FACTORY.objectNode();
+ protected final List<String> querySchema = Lists.newArrayList();
+ protected final List<String> selectedColumnNames = Lists.newArrayList();
+ @NonNull
+ protected final ESDriverConfig config;
+ protected ESQuery.QueryType queryType;
+
+ protected String index;
+ protected String type;
+ protected int limit;
+ protected JsonNode criteriaNode = getMatchAllNode();
+
+ protected ESVisitor(ESDriverConfig config) {
+ this.config = config;
+ limit = config.getMaxLimit();
+ }
+
+ private static JsonNode getMatchAllNode() {
+ final ObjectNode matchAllNode = JSON_NODE_FACTORY.objectNode();
+ matchAllNode.put(MATCH_ALL, JSON_NODE_FACTORY.objectNode());
+ return matchAllNode;
+ }
+
+ public static ESQuery rewrite(ESDriverConfig config, String hql) throws LensException {
+ ASTNode rootQueryNode;
+ try {
+ rootQueryNode = HQLParser.parseHQL(hql, new HiveConf());
+ } catch (Exception e) {
+ throw new ESRewriteException(e);
+ }
+ return rewrite(config, rootQueryNode);
+ }
+
+ public static ESQuery rewrite(ESDriverConfig config, ASTNode rootQueryNode) throws LensException {
+ final ESVisitor visitor = isAggregateQuery(rootQueryNode)
+ ?
+ new ESAggregateVisitor(config)
+ :
+ new ESTermVisitor(config);
+
+ new ASTTraverserForES(
+ rootQueryNode,
+ visitor,
+ new ESCriteriaVisitorFactory()
+ ).accept();
+
+ return visitor.getQuery();
+ }
+
+ /**
+ * Have to move them to proper util classes
+ * @param rootQueryNode, root node of AST
+ * @return if the query is aggregate
+ */
+ private static boolean isAggregateQuery(ASTNode rootQueryNode) {
+ return hasGroupBy(rootQueryNode) || areAllColumnsAggregate(rootQueryNode);
+ }
+
+ /**
+ * Have to move them to proper util functions
+ * @param rootQueryNode, root node of AST
+ * @return if all Cols are aggregate
+ */
+ private static boolean areAllColumnsAggregate(ASTNode rootQueryNode) {
+ boolean areAllColumnsAggregate = true;
+ final ASTNode selectNode = HQLParser.findNodeByPath(rootQueryNode, TOK_INSERT, TOK_SELECT);
+ for (Node selectExp : selectNode.getChildren()) {
+ final Node innerNode = selectExp.getChildren().get(0);
+ if (!innerNode.getName().equals(String.valueOf(TOK_FUNCTION))) {
+ areAllColumnsAggregate = false;
+ break;
+ }
+ }
+ return areAllColumnsAggregate;
+ }
+
+ private static boolean hasGroupBy(ASTNode rootQueryNode) {
+ return HQLParser.findNodeByPath(rootQueryNode, TOK_INSERT, TOK_GROUPBY) != null;
+ }
+
+ protected static String visitColumn(String cannonicalColName) {
+ final String[] colParts = cannonicalColName.split("\\.");
+ return colParts[colParts.length - 1];
+ }
+
+ @Override
+ public void visitFrom(String database, String table) {
+ Validate.notNull(database);
+ Validate.notNull(table);
+ Preconditions.checkNotNull(table);
+ index = database;
+ type = table;
+ }
+
+ @Override
+ public void visitLimit(int limit) {
+ this.limit = limit;
+ }
+
+ @Override
+ public void visitCriteria(ASTCriteriaVisitor visitedSubTree) {
+ criteriaNode = ((ESCriteriaVisitor) visitedSubTree).getNode();
+ }
+
+ @Override
+ public void visitAllCols() {
+ throw new UnsupportedOperationException("'*' is not supported in elastic search, select the columns required");
+ }
+
+ public ESQuery getQuery() {
+ Validate.isTrue(querySchema.size() == selectedColumnNames.size(),
+ "Query building seems to have gone wrong... schema and alias list size seems to be different");
+ return new ESQuery(index, type, queryNode.toString(), ImmutableList.copyOf(querySchema),
+ ImmutableList.copyOf(selectedColumnNames), queryType, limit);
+ }
+
+ private static class ESRewriteException extends RuntimeException {
+ public ESRewriteException(Exception e) {
+ super(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java
new file mode 100644
index 0000000..69f20d3
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator.impl;
+
+import java.util.Map;
+
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.translator.ESVisitor;
+
+import org.apache.commons.lang3.Validate;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+
+/**
+ * Visitor for traversing aggregate elastic search queries
+ * that involve group by or simple aggregations
+ */
+public class ESAggregateVisitor extends ESVisitor {
+
+ private final ObjectNode groupByNode = JSON_NODE_FACTORY.objectNode();
+ private final ObjectNode aggNode = JSON_NODE_FACTORY.objectNode();
+ private ObjectNode currentGroupByNode = groupByNode;
+ private Map<String, String> groupByKeys = Maps.newHashMap();
+
+ public ESAggregateVisitor(ESDriverConfig config) {
+ super(config);
+ queryType = ESQuery.QueryType.AGGR;
+ }
+
+ @Override
+ public void visitSimpleSelect(String columnName, String alias) {
+ columnName = visitColumn(columnName);
+ final String aliasName = alias == null ? columnName : alias;
+ Validate.isTrue(!querySchema.contains(aliasName), "Ambiguous alias '" + aliasName + "'");
+ querySchema.add(aliasName);
+ selectedColumnNames.add(columnName);
+ groupByKeys.put(columnName, aliasName);
+ }
+
+ @Override
+ public void visitAggregation(String aggregationType, String columnName, String alias) {
+ columnName = visitColumn(columnName);
+ final String aliasName = alias == null ? columnName : alias;
+ querySchema.add(aliasName);
+ selectedColumnNames.add(columnName);
+ final ObjectNode aggMeasures = JSON_NODE_FACTORY.objectNode();
+ final ObjectNode fieldNode = JSON_NODE_FACTORY.objectNode();
+ fieldNode.put(ESDriverConfig.FIELD, columnName);
+ aggMeasures.put(ESDriverConfig.AGGREGATIONS.get(aggregationType), fieldNode);
+ aggNode.put(aliasName, aggMeasures);
+ }
+
+ @Override
+ public void visitGroupBy(String groupBy) {
+ groupBy = visitColumn(groupBy);
+ final ObjectNode aggNode = JSON_NODE_FACTORY.objectNode();
+ currentGroupByNode.put(ESDriverConfig.AGGS, aggNode);
+ final ObjectNode groupByNode = JSON_NODE_FACTORY.objectNode();
+ aggNode.put(
+ Validate.notNull(groupByKeys.get(groupBy), "Group by column has to be used in select")
+ , groupByNode);
+ final ObjectNode termsNode = JSON_NODE_FACTORY.objectNode();
+ groupByNode.put(ESDriverConfig.TERMS, termsNode);
+ termsNode.put(ESDriverConfig.FIELD, groupBy);
+ termsNode.put(ESDriverConfig.SIZE, config.getAggrBucketSize());
+ currentGroupByNode = groupByNode;
+ }
+
+ @Override
+ public void visitOrderBy(String colName, OrderBy orderBy) {
+ /**
+ * TODO, the feature is partially available in elasticsearch.
+ * http://tinyurl.com/p6e5upo
+ */
+ throw new UnsupportedOperationException("Order by cannot be used with aggregate queries");
+ }
+
+ @Override
+ public void completeVisit() {
+ queryNode.put(ESDriverConfig.SIZE, ESDriverConfig.AGGR_TERM_FETCH_SIZE);
+ queryNode.put(ESDriverConfig.QUERY_TIME_OUT_STRING, config.getQueryTimeOutMs());
+ final ObjectNode outerAggsNode = JSON_NODE_FACTORY.objectNode();
+ queryNode.put(ESDriverConfig.AGGS, outerAggsNode);
+ outerAggsNode.put(ESDriverConfig.FILTER_WRAPPER, groupByNode);
+ groupByNode.put(ESDriverConfig.FILTER, criteriaNode);
+ currentGroupByNode.put(ESDriverConfig.AGGS, aggNode);
+ }
+}