You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2015/08/11 14:20:50 UTC

[27/50] [abbrv] incubator-lens git commit: LENS-252 : Add elastic search driver

LENS-252 : Add elastic search driver


Project: http://git-wip-us.apache.org/repos/asf/incubator-lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-lens/commit/bffa78c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-lens/tree/bffa78c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-lens/diff/bffa78c9

Branch: refs/heads/current-release-line
Commit: bffa78c9705d8eb75f72e0c916294af162e722c7
Parents: 882182a
Author: Amruth S <am...@gmail.com>
Authored: Fri Jul 31 13:44:54 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Fri Jul 31 13:44:54 2015 +0530

----------------------------------------------------------------------
 .../apache/lens/doc/TestGenerateConfigDoc.java  |  21 +
 lens-driver-es/pom.xml                          |  79 +++
 .../lens/driver/es/ASTTraverserForES.java       | 359 ++++++++++++
 .../org/apache/lens/driver/es/ESDriver.java     | 389 +++++++++++++
 .../apache/lens/driver/es/ESDriverConfig.java   | 122 ++++
 .../java/org/apache/lens/driver/es/ESQuery.java |  55 ++
 .../apache/lens/driver/es/client/ESClient.java  | 169 ++++++
 .../lens/driver/es/client/ESResultSet.java      |  76 +++
 .../driver/es/client/jest/JestClientImpl.java   |  94 +++
 .../client/jest/JestResultSetTransformer.java   | 215 +++++++
 .../driver/es/exceptions/ESClientException.java |  62 ++
 .../es/exceptions/InvalidQueryException.java    |  65 +++
 .../es/translator/ASTCriteriaVisitor.java       |  48 ++
 .../lens/driver/es/translator/ASTVisitor.java   |  47 ++
 .../es/translator/CriteriaVisitorFactory.java   |  26 +
 .../lens/driver/es/translator/ESVisitor.java    | 178 ++++++
 .../es/translator/impl/ESAggregateVisitor.java  | 106 ++++
 .../es/translator/impl/ESCriteriaVisitor.java   | 100 ++++
 .../impl/ESCriteriaVisitorFactory.java          |  29 +
 .../es/translator/impl/ESTermVisitor.java       |  94 +++
 .../src/main/resources/esdriver-default.xml     |  62 ++
 .../org/apache/lens/driver/es/ESDriverTest.java |  43 ++
 .../org/apache/lens/driver/es/MockClientES.java | 145 +++++
 .../lens/driver/es/QueryTranslationTest.java    | 134 +++++
 .../driver/es/ResultSetTransformationTest.java  | 573 +++++++++++++++++++
 .../lens/driver/es/ScrollingQueryTest.java      |  93 +++
 .../src/test/resources/invalid-queries.data     |  18 +
 .../src/test/resources/valid-queries.data       |  67 +++
 .../lens/server/api/LensConfConstants.java      |  10 +
 lens-server/pom.xml                             |   5 +
 pom.xml                                         |  34 ++
 src/site/apt/admin/config-server.apt            |  19 +-
 src/site/apt/admin/esdriver-config.apt          |  41 ++
 33 files changed, 3570 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
----------------------------------------------------------------------
diff --git a/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java b/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
index 5ef4186..8025360 100644
--- a/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
+++ b/lens-api/src/test/java/org/apache/lens/doc/TestGenerateConfigDoc.java
@@ -62,6 +62,11 @@ public class TestGenerateConfigDoc {
   public static final String JDBC_DRIVER_CONF_FILE = "../lens-driver-jdbc/src/main/resources/jdbcdriver-default.xml";
 
   /**
+   * The Constant ES_DRIVER_CONF_FILE.
+   */
+  public static final String ES_DRIVER_CONF_FILE = "../lens-driver-es/src/main/resources/esdriver-default.xml";
+
+  /**
    * The Constant CLIENT_CONF_FILE.
    */
   public static final String CLIENT_CONF_FILE = "../lens-client/src/main/resources/lens-client-default.xml";
@@ -92,6 +97,11 @@ public class TestGenerateConfigDoc {
   public static final String JDBC_DRIVER_APT_FILE = "../src/site/apt/admin/jdbcdriver-config.apt";
 
   /**
+   * The Constant ES_DRIVER_APT_FILE.
+   */
+  public static final String ES_DRIVER_APT_FILE = "../src/site/apt/admin/esdriver-config.apt";
+
+  /**
    * The Constant CLIENT_APT_FILE.
    */
   public static final String CLIENT_APT_FILE = "../src/site/apt/user/client-config.apt";
@@ -146,6 +156,17 @@ public class TestGenerateConfigDoc {
   }
 
   /**
+   * Generate esdriver config doc.
+   *
+   * @throws Exception the exception
+   */
+  @Test
+  public void generateESdriverConfigDoc() throws Exception {
+    ConfigPrinter printer = new ConfigPrinter(ES_DRIVER_CONF_FILE, ES_DRIVER_APT_FILE);
+    printer.generateDoc("ES driver configuration");
+  }
+
+  /**
    * Generate client config doc.
    *
    * @throws Exception the exception

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/pom.xml
----------------------------------------------------------------------
diff --git a/lens-driver-es/pom.xml b/lens-driver-es/pom.xml
new file mode 100644
index 0000000..7cff3ff
--- /dev/null
+++ b/lens-driver-es/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.lens</groupId>
+        <artifactId>apache-lens</artifactId>
+        <version>2.3.0-beta-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>lens-driver-es</artifactId>
+    <packaging>jar</packaging>
+    <description>ES execution driver</description>
+
+    <name>Lens Elastic Search Driver</name>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-service</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lens</groupId>
+            <artifactId>lens-cube</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>lens-api</artifactId>
+            <groupId>org.apache.lens</groupId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.searchbox</groupId>
+            <artifactId>jest</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java
new file mode 100644
index 0000000..5e082e5
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ASTTraverserForES.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lens.cube.parse.HQLParser;
+import org.apache.lens.driver.es.exceptions.InvalidQueryException;
+import org.apache.lens.driver.es.translator.ASTCriteriaVisitor;
+import org.apache.lens.driver.es.translator.ASTVisitor;
+import org.apache.lens.driver.es.translator.CriteriaVisitorFactory;
+import org.apache.lens.server.api.error.LensException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * This traverses ASTNode in inorder fashion.
+ * More visitors (translation/validation) can be added.
+ * Any SQL query can be converted to ASTNode and can be traversed using this traversal
+ *
+ * Currently this traversal is limited for elastic search. So naming it this way.
+ *
+ * Look at the constructor for usage.
+ */
+@RequiredArgsConstructor
+public final class ASTTraverserForES {
+
+  /**
+   * the root node of the ASTNode
+   */
+  @NonNull
+  private final ASTNode rootQueryNode;
+  /**
+   * The basic query visitor
+   */
+  @NonNull
+  private final ASTVisitor visitor;
+  /**
+   * the criteria visitor factory,
+   * traversal has to create multiple criteria visitor objects for
+   * nested criteria. The impl of factory would determine the type of
+   * criteria visitor
+   */
+  @NonNull
+  private final CriteriaVisitorFactory criteriaVisitorFactory;
+
+  public void accept() throws InvalidQueryException {
+    traverseSelects();
+    traverseTableName();
+    traverseCriteria();
+    traverseGroupBy();
+    traverseOrderBy();
+    traverseLimit();
+    visitor.completeVisit();
+  }
+
+  /**
+   * Visit select expressions
+   */
+  public void traverseSelects() throws InvalidQueryException {
+    final ASTNode selectNode = HQLParser.findNodeByPath(rootQueryNode, HiveParser.TOK_INSERT, HiveParser.TOK_SELECT);
+    if (selectNode == null) {
+      throw new InvalidQueryException("No columns are selected!");
+    }
+    try {
+      for (Node selectExp : selectNode.getChildren()) {
+        final Node innerNode = Helper.getFirstChild(selectExp);
+        final String alias = Helper.getAliasFromSelectExpr(selectExp);
+        if (innerNode.getName().equals(String.valueOf(HiveParser.TOK_FUNCTION))) {
+          Validate.isTrue(innerNode.getChildren().size() == 2);
+          visitor.visitAggregation(
+            Helper.getFirstChild(innerNode).toString(),
+            Helper.getColumnNameFrom(innerNode.getChildren().get(1)),
+            alias
+          );
+        } else if (innerNode.getName().equals(String.valueOf(HiveParser.TOK_ALLCOLREF))) {
+          visitor.visitAllCols();
+        } else if (innerNode.getName().equals(String.valueOf(HiveParser.TOK_TABLE_OR_COL))
+          || innerNode.toString().equals(".")) {
+          visitor.visitSimpleSelect(
+            Helper.getColumnNameFrom(innerNode),
+            alias
+          );
+        } else {
+          throw new InvalidQueryException(selectExp.getName() + " seems to be invalid");
+        }
+      }
+    } catch (Exception e) {
+      throw new InvalidQueryException("Exception while traversing select expressions", e);
+    }
+
+  }
+
+  /**
+   * Visit table name
+   */
+  private void traverseTableName() throws InvalidQueryException {
+    try {
+      final ASTNode dbSchemaTable = HQLParser.findNodeByPath(
+        rootQueryNode,
+        HiveParser.TOK_FROM,
+        HiveParser.TOK_TABREF,
+        HiveParser.TOK_TABNAME);
+      Validate.notNull(dbSchemaTable, "Index and type not present");
+      Validate.isTrue(dbSchemaTable.getChildren().size() == 2, "Index and type not present");
+      final String dbSchema = dbSchemaTable.getChild(0).getText();
+      final String tableName = dbSchemaTable.getChild(1).getText();
+      visitor.visitFrom(dbSchema, tableName);
+    } catch (Exception e) {
+      throw new InvalidQueryException("Error while traversing table name "
+        + "- Expected grammar .. from <index>.<type>", e);
+    }
+  }
+
+  /**
+   * Visit criteria
+   */
+  private void traverseCriteria() throws InvalidQueryException {
+    try {
+      final ASTNode criteriaNode = HQLParser.findNodeByPath(rootQueryNode,
+        HiveParser.TOK_INSERT, HiveParser.TOK_WHERE);
+      if (criteriaNode != null) {
+        visitor.visitCriteria(traverseCriteriaRecursively(Helper.getFirstChild(criteriaNode)));
+      }
+    } catch (Exception e) {
+      throw new InvalidQueryException("Exception while traversing criteria", e);
+    }
+  }
+
+  private ASTCriteriaVisitor traversePredicate(Node whereClause, PredicateInfo predicateInfo)
+    throws InvalidQueryException {
+    final ASTCriteriaVisitor childVisitor = criteriaVisitorFactory.getInstance();
+    final ArrayList<String> rightExpressions = Lists.newArrayList();
+    final List<? extends Node> rightExpList = whereClause.getChildren();
+    String leftCol;
+    switch (predicateInfo.predicateType) {
+    case BETWEEN:
+      Validate.isTrue(rightExpList.size()==5, "Atleast one right expression needed");
+      rightExpressions.add(whereClause.getChildren().get(3).toString());
+      rightExpressions.add(whereClause.getChildren().get(4).toString());
+      leftCol = whereClause.getChildren().get(2).getChildren().get(1).toString();
+      break;
+    case SIMPLE:
+      Validate.isTrue(rightExpList.size()>1, "Atleast one right expression needed");
+      for(Node rightExp : rightExpList.subList(1, rightExpList.size())) {
+        rightExpressions.add(rightExp.toString());
+      }
+      leftCol = Helper.getLeftColFromPredicate(whereClause);
+      break;
+    default:
+      throw new InvalidQueryException("No handlers for predicate " + predicateInfo.predicateType);
+    }
+    childVisitor.visitPredicate(predicateInfo.predicateOp
+      , leftCol
+      , rightExpressions);
+    return childVisitor;
+  }
+
+  private ASTCriteriaVisitor traverseLogicalOperator(Node whereClause, LogicalOpInfo logicalOpInfo)
+    throws InvalidQueryException {
+    final ASTCriteriaVisitor childVisitor = criteriaVisitorFactory.getInstance();
+    final List<ASTCriteriaVisitor> childVisitors = Lists.newArrayList();
+    for (Node node : whereClause.getChildren()) {
+      childVisitors.add(traverseCriteriaRecursively(node));
+    }
+    switch (logicalOpInfo.logicalOpType) {
+    case UNARY:
+      childVisitor.visitUnaryLogicalOp(logicalOpInfo.logicalOperator, childVisitors.get(0));
+      break;
+    case BINARY:
+      childVisitor.visitLogicalOp(logicalOpInfo.logicalOperator, childVisitors);
+      break;
+    }
+    return childVisitor;
+  }
+
+  private ASTCriteriaVisitor traverseCriteriaRecursively(Node whereClause) throws InvalidQueryException {
+    final CriteriaInfo criteriaInfo = Helper.getCriteriaInfo(whereClause);
+    switch (criteriaInfo.criteriaType) {
+    case PREDICATE:
+      return traversePredicate(whereClause, (PredicateInfo) criteriaInfo);
+    case LOGICAL:
+      return traverseLogicalOperator(whereClause, (LogicalOpInfo) criteriaInfo);
+    default:
+      throw new InvalidQueryException("Expecting a predicate or logical operator but got this "
+        + whereClause.toString());
+    }
+  }
+
+  /**
+   * Visit group by
+   */
+  private void traverseGroupBy() throws InvalidQueryException {
+    try {
+      final ASTNode groupByNode = HQLParser.findNodeByPath(rootQueryNode,
+        HiveParser.TOK_INSERT, HiveParser.TOK_GROUPBY);
+      if (groupByNode != null) {
+        for (Node groupBy : groupByNode.getChildren()) {
+          visitor.visitGroupBy(Helper.getColumnNameFrom(groupBy));
+        }
+      }
+    } catch (Exception e) {
+      throw new InvalidQueryException("Exception while parsing group by", e);
+    }
+  }
+
+  /**
+   * Visit order by
+   */
+  private void traverseOrderBy() throws InvalidQueryException {
+    try {
+      final ASTNode orderByNode = HQLParser.findNodeByPath(rootQueryNode,
+        HiveParser.TOK_INSERT, HiveParser.TOK_ORDERBY);
+      if (orderByNode != null) {
+        for (Node orderBy : orderByNode.getChildren()) {
+          visitor.visitOrderBy(
+            Helper.getColumnNameFrom(Helper.getFirstChild(orderBy)),
+            orderBy.getName().equals(String.valueOf(HiveParser.TOK_TABSORTCOLNAMEDESC))
+              ?
+              ASTVisitor.OrderBy.DESC
+              :
+              ASTVisitor.OrderBy.ASC
+          );
+        }
+      }
+    } catch (Exception e) {
+      throw new InvalidQueryException("Exception while parsing order by", e);
+    }
+  }
+
+  /**
+   * Visit limit
+   */
+  private void traverseLimit() throws InvalidQueryException {
+    try {
+      final ASTNode limitNode = HQLParser.findNodeByPath(rootQueryNode,
+        HiveParser.TOK_INSERT, HiveParser.TOK_LIMIT);
+      if (limitNode != null) {
+        visitor.visitLimit(Integer.parseInt(Helper.getFirstChild(limitNode).toString()));
+      }
+    } catch (Exception e) {
+      throw new InvalidQueryException("Error while parsing limit, format should be limit <int>", e);
+    }
+  }
+
+  private enum PredicateType {SIMPLE, BETWEEN};
+  private enum CriteriaType {PREDICATE, LOGICAL}
+  private enum LogicalOpType {UNARY, BINARY}
+  private static class CriteriaInfo {
+    final CriteriaType criteriaType;
+
+    public CriteriaInfo(CriteriaType criteriaType) {
+      this.criteriaType = criteriaType;
+    }
+  }
+  private static class LogicalOpInfo extends CriteriaInfo{
+    final String logicalOperator;
+    final LogicalOpType logicalOpType;
+
+    public LogicalOpInfo(String logicalOperator, LogicalOpType logicalOpType) {
+      super(CriteriaType.LOGICAL);
+      this.logicalOperator = logicalOperator;
+      this.logicalOpType = logicalOpType;
+    }
+  }
+  private static class PredicateInfo extends CriteriaInfo {
+    final PredicateType predicateType;
+    final String predicateOp;
+
+
+    public PredicateInfo(String operator, PredicateType predicateType) {
+      super(CriteriaType.PREDICATE);
+      this.predicateType = predicateType;
+      this.predicateOp = operator;
+    }
+  }
+
+  private static class Helper {
+
+    private static List<String> predicates = Lists.newArrayList("=", ">", "<", "<=", ">=", "between");
+    private static List<String> unaryLogicalOps = Lists.newArrayList("not", "!");
+    private static List<String> binaryLogicalOps = Lists.newArrayList("and", "or", "&", "|", "&&", "||");
+    private static List<String> logicalOps = Lists.newArrayList();
+    static {
+      logicalOps.addAll(unaryLogicalOps);
+      logicalOps.addAll(binaryLogicalOps);
+    }
+
+    private static String getAliasFromSelectExpr(Node selectExp) {
+      return selectExp.getChildren().size() == 2
+        ?
+        selectExp.getChildren().get(1).toString()
+        :
+        null;
+    }
+
+    private static CriteriaInfo getCriteriaInfo(Node whereClause) throws InvalidQueryException {
+      String whereRoot = whereClause.toString();
+      if (Helper.unaryLogicalOps.contains(whereRoot)) {
+        return new LogicalOpInfo(whereRoot, LogicalOpType.UNARY);
+      } else if (Helper.binaryLogicalOps.contains(whereRoot)) {
+        return new LogicalOpInfo(whereRoot, LogicalOpType.BINARY);
+      } else if (Helper.predicates.contains(whereRoot)) {
+        return new PredicateInfo(whereRoot, PredicateType.SIMPLE);
+      } else if (whereRoot.equals("TOK_FUNCTION") && whereClause.getChildren().get(0).toString().equals("between")) {
+        return new PredicateInfo("between", PredicateType.BETWEEN);
+      } else {
+        throw new InvalidQueryException("Could not get criteria info for where clause " + whereRoot);
+      }
+    }
+
+    private static Node getFirstChild(Node node) throws LensException {
+      try {
+        return node.getChildren().get(0);
+      } catch (Exception e) {
+        throw new LensException("Expecting a non empty first child for " + node.toString(), e);
+      }
+    }
+
+    private static String getLeftColFromPredicate(Node predicateNode) throws InvalidQueryException {
+      try {
+        return getColumnNameFrom(getFirstChild(predicateNode));
+      } catch (Exception e) {
+        throw new InvalidQueryException("Only simple predicates of the grammar <col>=<val> is supported as of now", e);
+      }
+    }
+
+    private static String getColumnNameFrom(Node columnNode) {
+      final StringBuilder stringBuilder = new StringBuilder();
+      HQLParser.toInfixString((ASTNode) columnNode, stringBuilder);
+      return stringBuilder.toString().replaceAll("[() ]", "");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java
new file mode 100644
index 0000000..14d9f99
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryPrepareHandle;
+import org.apache.lens.cube.metadata.CubeMetastoreClient;
+import org.apache.lens.cube.parse.HQLParser;
+import org.apache.lens.driver.es.client.ESClient;
+import org.apache.lens.driver.es.client.ESResultSet;
+import org.apache.lens.driver.es.client.jest.JestClientImpl;
+import org.apache.lens.driver.es.translator.ESVisitor;
+import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.driver.*;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.events.LensEventListener;
+import org.apache.lens.server.api.query.AbstractQueryContext;
+import org.apache.lens.server.api.query.PreparedQueryContext;
+import org.apache.lens.server.api.query.QueryContext;
+import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy;
+import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
+import org.apache.lens.server.api.query.cost.FactPartitionBasedQueryCost;
+import org.apache.lens.server.api.query.cost.QueryCost;
+
+import org.apache.commons.lang.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.antlr.runtime.CommonToken;
+import org.antlr.runtime.tree.Tree;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Driver for elastic search
+ */
+@Slf4j
+public class ESDriver implements LensDriver {
+
+  private static final AtomicInteger THID = new AtomicInteger();
+  private static final double STREAMING_PARTITION_COST = 0;
+  private static final QueryCost ES_DRIVER_COST = new FactPartitionBasedQueryCost(STREAMING_PARTITION_COST);
+
+  private Configuration conf;
+  private ESClient esClient;
+  private ExecutorService asyncQueryPool;
+  private ESDriverConfig config;
+
+  /**
+   * States
+   */
+  private final Map<String, ESQuery> rewrittenQueriesCache = Maps.newConcurrentMap();
+  private final Map<QueryHandle, Future<LensResultSet>> resultSetMap = Maps.newConcurrentMap();
+  private final Map<QueryHandle, QueryCompletionListener> handleListenerMap = Maps.newConcurrentMap();
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public QueryCost estimate(AbstractQueryContext qctx) {
+    return ES_DRIVER_COST;
+  }
+
+  @Override
+  public DriverQueryPlan explain(final AbstractQueryContext context) throws LensException {
+    final ESQuery esQuery = rewrite(context);
+    final String jsonExplanation = esClient.explain(esQuery);
+    if (jsonExplanation == null) {
+      throw new LensException("Explanation failed, empty json was returned");
+    }
+    return new DriverQueryPlan() {
+      @Override
+      public String getPlan() {
+        return jsonExplanation;
+      }
+
+      @Override
+      public QueryCost getCost() {
+        return ES_DRIVER_COST;
+      }
+    };
+  }
+
+  @Override
+  public void prepare(PreparedQueryContext pContext) throws LensException {
+    rewrite(pContext);
+  }
+
+  @Override
+  public DriverQueryPlan explainAndPrepare(PreparedQueryContext pContext) throws LensException {
+    prepare(pContext);
+    return explain(pContext);
+  }
+
+  @Override
+  public void closePreparedQuery(QueryPrepareHandle handle) {
+    /**
+     * Elastic search does not have a concept of prepared query.
+     */
+  }
+
+  @Override
+  public LensResultSet execute(QueryContext context) throws LensException {
+    final ESQuery esQuery = rewrite(context);
+    final QueryHandle queryHandle = context.getQueryHandle();
+    final ESResultSet resultSet = esClient.execute(esQuery);
+    notifyComplIfRegistered(queryHandle);
+    return resultSet;
+  }
+
+  @Override
+  public void executeAsync(final QueryContext context) {
+    final Future<LensResultSet> futureResult
+      = asyncQueryPool.submit(new ESQueryExecuteCallable(context, SessionState.get()));
+    resultSetMap.put(context.getQueryHandle(), futureResult);
+  }
+
+  @Override
+  public void registerForCompletionNotification(QueryHandle handle, long timeoutMillis,
+                                                QueryCompletionListener listener) {
+    handleListenerMap.put(handle, listener);
+  }
+
+  @Override
+  public void updateStatus(QueryContext context) {
+    final QueryHandle queryHandle = context.getQueryHandle();
+    final Future<LensResultSet> lensResultSetFuture = resultSetMap.get(queryHandle);
+    if (lensResultSetFuture == null) {
+      context.getDriverStatus().setState(DriverQueryStatus.DriverQueryState.CLOSED);
+      context.getDriverStatus().setStatusMessage(queryHandle + " closed");
+      context.getDriverStatus().setResultSetAvailable(false);
+    } else if (lensResultSetFuture.isDone()) {
+      context.getDriverStatus().setState(DriverQueryStatus.DriverQueryState.SUCCESSFUL);
+      context.getDriverStatus().setStatusMessage(queryHandle + " successful");
+      context.getDriverStatus().setResultSetAvailable(true);
+    } else if (lensResultSetFuture.isCancelled()) {
+      context.getDriverStatus().setState(DriverQueryStatus.DriverQueryState.CANCELED);
+      context.getDriverStatus().setStatusMessage(queryHandle + " cancelled");
+      context.getDriverStatus().setResultSetAvailable(false);
+    }
+  }
+
+  @Override
+  public LensResultSet fetchResultSet(QueryContext context) throws LensException {
+    try {
+      /**
+       * removing the result set as soon as the fetch is done
+       */
+      return resultSetMap.remove(context.getQueryHandle()).get();
+    } catch (NullPointerException e) {
+      throw new LensException("The results for the query "
+        + context.getQueryHandleString()
+        + "has already been fetched");
+    } catch (InterruptedException | ExecutionException e) {
+      throw new LensException("Error fetching result set!", e);
+    }
+  }
+
+  @Override
+  public void closeResultSet(QueryHandle handle) throws LensException {
+    try {
+      resultSetMap.remove(handle);
+    } catch (NullPointerException e) {
+      throw new LensException("The query does not exist or was already purged", e);
+    }
+  }
+
+  @Override
+  public boolean cancelQuery(QueryHandle handle) throws LensException {
+    try {
+      boolean cancelled = resultSetMap.get(handle).cancel(true);
+      if (cancelled) {
+        notifyQueryCancellation(handle);
+      }
+      return cancelled;
+    } catch (NullPointerException e) {
+      throw new LensException("The query does not exist or was already purged", e);
+    }
+  }
+
+  @Override
+  public void closeQuery(QueryHandle handle) throws LensException {
+    cancelQuery(handle);
+    closeResultSet(handle);
+    handleListenerMap.remove(handle);
+  }
+
+  @Override
+  public void close() throws LensException {
+    for(QueryHandle handle : resultSetMap.keySet()) {
+      try {
+        closeQuery(handle);
+      } catch (LensException e) {
+        log.error("Error while closing query {}", handle.getHandleIdString(), e);
+      }
+    }
+  }
+
+  @Override
+  public void registerDriverEventListener(LensEventListener<DriverEvent> driverEventListener) {
+
+  }
+
+  @Override
+  public ImmutableSet<QueryLaunchingConstraint> getQueryConstraints() {
+    return ImmutableSet.copyOf(Sets.<QueryLaunchingConstraint>newHashSet());
+  }
+
+  @Override
+  public ImmutableSet<WaitingQueriesSelectionPolicy> getWaitingQuerySelectionPolicies() {
+    return ImmutableSet.copyOf(Sets.<WaitingQueriesSelectionPolicy>newHashSet());
+  }
+
+  private void notifyComplIfRegistered(QueryHandle queryHandle) {
+    try {
+      handleListenerMap.get(queryHandle).onCompletion(queryHandle);
+    } catch (NullPointerException e) {
+      log.debug("There are no subscriptions for notification. Skipping for {}", queryHandle.getHandleIdString(), e);
+    }
+  }
+
+  private void notifyQueryCancellation(QueryHandle handle) {
+    try {
+      handleListenerMap.get(handle).onError(handle, handle + " cancelled");
+    } catch (NullPointerException e) {
+      log.debug("There are no subscriptions for notification. Skipping for {}", handle.getHandleIdString(), e);
+    }
+  }
+
+  private ESQuery rewrite(AbstractQueryContext context) throws LensException {
+    final String key = keyFor(context);
+    if (rewrittenQueriesCache.containsKey(key)) {
+      return rewrittenQueriesCache.get(key);
+    } else {
+      final ASTNode rootQueryNode = HQLParser.parseHQL(context.getDriverQuery(this), new HiveConf());
+      setIndexAndTypeIfNotPresent(context, rootQueryNode);
+      final ESQuery esQuery = ESVisitor.rewrite(config, rootQueryNode);
+      rewrittenQueriesCache.put(key, esQuery);
+      return esQuery;
+    }
+  }
+
+  private void setIndexAndTypeIfNotPresent(AbstractQueryContext context, ASTNode rootQueryNode) throws LensException {
+    final ASTNode dbSchemaTable = HQLParser.findNodeByPath(
+      rootQueryNode,
+      HiveParser.TOK_FROM,
+      HiveParser.TOK_TABREF,
+      HiveParser.TOK_TABNAME);
+    try {
+      Validate.notNull(dbSchemaTable);
+      if (dbSchemaTable.getChildren().size() == 2) {
+        /**
+         * Index and type is already set here
+         */
+        return;
+      }
+      /**
+       * Get the table name, check metastore and set index and actual table name
+       */
+      final Tree firstChild = dbSchemaTable.getChild(0);
+      final String lensTable = firstChild.getText();
+      final Table tbl = CubeMetastoreClient.getInstance(context.getHiveConf()).getHiveTable(lensTable);
+      final String index = tbl.getProperty(LensConfConstants.ES_INDEX_NAME);
+      final String type = tbl.getProperty(LensConfConstants.ES_TYPE_NAME);
+      Validate.notNull(index, LensConfConstants.ES_INDEX_NAME + " property missing in table definition");
+      Validate.notNull(type, LensConfConstants.ES_TYPE_NAME + " property missing in table definition");
+      ((ASTNode) firstChild).getToken().setText(type);
+      final ASTNode indexIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, index));
+      indexIdentifier.setParent(dbSchemaTable);
+      dbSchemaTable.insertChild(0, indexIdentifier);
+    } catch (HiveException e) {
+      throw new LensException("Error occured when trying to communicate with metastore");
+    }
+  }
+
+  private String keyFor(AbstractQueryContext context) {
+    return String.valueOf(context.getFinalDriverQuery(this)!=null) + ":" + context.getDriverQuery(this);
+  }
+
+  ESClient getESClient() {
+    return esClient;
+  }
+
+  @Override
+  public void configure(Configuration conf) throws LensException {
+    this.conf = new Configuration(conf);
+    this.conf.addResource("esdriver-default.xml");
+    this.conf.addResource("esdriver-site.xml");
+    config = new ESDriverConfig(this.conf);
+    Class klass;
+    try {
+      klass = Class.forName(this.conf.get(ESDriverConfig.CLIENT_CLASS_KEY));
+      if (klass != null) {
+        log.debug("Picked up class {}", klass);
+        if (ESClient.class.isAssignableFrom(klass)) {
+          final Constructor constructor = klass.getConstructor(ESDriverConfig.class, Configuration.class);
+          esClient = (ESClient) constructor.newInstance(config, this.conf);
+          log.debug("Successfully instantiated es client of type {}", klass);
+        }
+      } else {
+        log.debug("Client class not provided, falling back to the default Jest client");
+        esClient = new JestClientImpl(config, conf);
+      }
+    } catch (ClassNotFoundException
+      | NoSuchMethodException
+      | InstantiationException
+      | IllegalAccessException
+      | InvocationTargetException e) {
+      log.error("ES driver cannot start!", e);
+      throw new LensException("Cannot start es driver", e);
+    }
+    log.debug("ES Driver configured");
+    asyncQueryPool = Executors.newCachedThreadPool(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable runnable) {
+        Thread th = new Thread(runnable);
+        th.setName("lens-driver-es-" + THID.incrementAndGet());
+        return th;
+      }
+    });
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    /**
+     * This flow could be abstracted out at the driver level
+     */
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    /**
+     * This flow could be abstracted out at the driver level
+     */
+  }
+
+  protected class ESQueryExecuteCallable implements Callable<LensResultSet> {
+
+    private final QueryContext queryContext;
+    private final SessionState sessionState;
+
+    public ESQueryExecuteCallable(QueryContext queryContext, SessionState sessionState) {
+      this.queryContext = queryContext;
+      this.sessionState = sessionState;
+    }
+
+    @Override
+    public LensResultSet call() throws Exception {
+      SessionState.setCurrentSessionState(sessionState);
+      return execute(queryContext);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java
new file mode 100644
index 0000000..df01fe6
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriverConfig.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import org.apache.lens.driver.es.translator.ASTVisitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableMap;
+import lombok.Getter;
+
+/**
+ * The constants used by ESDriver and es re-writers
+ */
+public final class ESDriverConfig {
+  public static final String CLIENT_CLASS_KEY = "lens.driver.es.client.class";
+  public static final String MAX_ROW_SIZE_KEY = "lens.driver.es.max.row.size";
+  public static final String TERM_FETCH_SIZE_KEY = "lens.driver.es.term.fetch.size";
+  public static final String AGGR_BUCKET_SIZE_LENS_KEY = "lens.driver.es.aggr.bucket.size";
+  public static final String QUERY_TIME_OUT_LENS_KEY = "lens.driver.es.query.timeout.millis";
+
+  public static final String AGGS = "aggs";
+  public static final String MATCH_ALL = "match_all";
+  public static final String TERMS = "terms";
+  public static final String TERM = "term";
+  public static final String FIELD = "field";
+  public static final String FILTER = "filter";
+  public static final String FROM = "from";
+  public static final String RANGE = "range";
+  public static final String FILTER_WRAPPER = "filter_wrapper";
+  public static final String FIELDS = "fields";
+  public static final String TERM_SORT = "sort";
+  public static final String SIZE = "size";
+  public static final String QUERY_TIME_OUT_STRING = "timeout";
+
+  public static final ImmutableMap<String, String> LOGICAL_OPS;
+  public static final ImmutableMap<String, String> RANGE_PREDICATES;
+  public static final ImmutableMap<String, String> PREDICATES;
+  public static final ImmutableMap<String, String> AGGREGATIONS;
+  public static final ImmutableMap<ASTVisitor.OrderBy, String> ORDER_BYS;
+  public static final int AGGR_TERM_FETCH_SIZE = 0;
+  public static final int DEFAULT_TERM_QUERY_OFFSET = 0;
+  private static final int DEFAULT_TERM_QUERY_LIMIT = -1;
+  private static final int AGGR_BUCKET_SIZE_DEFAULT = 10000;
+  private static final int QUERY_TIME_OUT_MS_DEFAULT = 10000;
+
+  private static final int TERM_FETCH_SIZE_DEFAULT = 5000;
+
+  static {
+    final ImmutableMap.Builder<String, String> logicalOpsBuilder = ImmutableMap.builder();
+    logicalOpsBuilder.put("and", "and");
+    logicalOpsBuilder.put("&&", "and");
+    logicalOpsBuilder.put("&", "and");
+    logicalOpsBuilder.put("or", "or");
+    logicalOpsBuilder.put("||", "or");
+    logicalOpsBuilder.put("|", "or");
+    logicalOpsBuilder.put("!", "not");
+    LOGICAL_OPS = logicalOpsBuilder.build();
+
+    final ImmutableMap.Builder<String, String> predicatesBuilder = ImmutableMap.builder();
+    predicatesBuilder.put(">", "gt");
+    predicatesBuilder.put(">=", "gte");
+    predicatesBuilder.put("<", "lt");
+    predicatesBuilder.put("<=", "lte");
+    predicatesBuilder.put("between", "range");
+    RANGE_PREDICATES = predicatesBuilder.build();
+    predicatesBuilder.put("=", "term");
+    predicatesBuilder.put("in", "terms");
+    PREDICATES = predicatesBuilder.build();
+
+    final ImmutableMap.Builder<String, String> aggregationsBuilder = ImmutableMap.builder();
+    aggregationsBuilder.put("count", "value_count");
+    aggregationsBuilder.put("count_distinct", "cardinality");
+    aggregationsBuilder.put("max", "max");
+    aggregationsBuilder.put("sum", "sum");
+    aggregationsBuilder.put("min", "min");
+    aggregationsBuilder.put("avg", "avg");
+    aggregationsBuilder.put("percentile", "percentiles");
+    AGGREGATIONS = aggregationsBuilder.build();
+
+    final ImmutableMap.Builder<ASTVisitor.OrderBy, String> orderByBuilder = ImmutableMap.builder();
+    orderByBuilder.put(ASTVisitor.OrderBy.ASC, "asc");
+    orderByBuilder.put(ASTVisitor.OrderBy.DESC, "desc");
+    ORDER_BYS = orderByBuilder.build();
+  }
+
+  @Getter
+  private final int maxLimit;
+  @Getter
+  private final int aggrBucketSize;
+  @Getter
+  private final int queryTimeOutMs;
+  private final int termFetchSize;
+
+  public int getTermFetchSize() {
+    return termFetchSize;
+  }
+
+  public ESDriverConfig(Configuration conf) {
+    maxLimit = conf.getInt(MAX_ROW_SIZE_KEY, DEFAULT_TERM_QUERY_LIMIT);
+    aggrBucketSize = conf.getInt(AGGR_BUCKET_SIZE_LENS_KEY, AGGR_BUCKET_SIZE_DEFAULT);
+    queryTimeOutMs = conf.getInt(QUERY_TIME_OUT_LENS_KEY, QUERY_TIME_OUT_MS_DEFAULT);
+    termFetchSize = conf.getInt(TERM_FETCH_SIZE_KEY, TERM_FETCH_SIZE_DEFAULT);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java
new file mode 100644
index 0000000..32e3156
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESQuery.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es;
+
+import com.google.common.collect.ImmutableList;
+import lombok.Data;
+
+@Data
+public final class ESQuery {
+  public enum QueryType {AGGR, TERM}
+  /**
+   * index on which the query will get executed
+   */
+  private final String index;
+  /**
+   * type on which the query will get executed
+   */
+  private final String type;
+  /**
+   * the query
+   */
+  private final String query;
+  /**
+   * aliases in the query (ordered)
+   */
+  private final ImmutableList<String> schema;
+  /**
+   * Columns/expressions in the query (ordered)
+   */
+  private final ImmutableList<String> columns;
+  /**
+   * Type of the query, aggregate or term;
+   */
+  private final QueryType queryType;
+  /**
+   * size of the query;
+   */
+  private final int limit;
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java
new file mode 100644
index 0000000..5363a94
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESClient.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.lens.api.query.ResultRow;
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.exceptions.ESClientException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.NonNull;
+
+public abstract class ESClient {
+
+  @NonNull
+  protected final ESDriverConfig esDriverConfig;
+
+  private ExecutionMode getExecutionModeFor(ESQuery esQuery) {
+    return esQuery.getQueryType().equals(ESQuery.QueryType.AGGR)
+      ?
+      new DefaultExecutionMode(esQuery)
+      :
+      new ScrollingExecutionMode(esQuery);
+  }
+
+  protected abstract ESResultSet executeImpl(ESQuery esQuery) throws ESClientException;
+
+  private abstract static class ExecutionMode {
+
+    @NonNull
+    protected final ESQuery esQuery;
+
+    ExecutionMode(ESQuery query) {
+      this.esQuery = query;
+    }
+
+    abstract ESResultSet executeInternal() throws ESClientException;
+
+  }
+
+  private class ScrollingExecutionMode extends ExecutionMode {
+
+    @NonNull
+    final JsonObject jsonQuery;
+
+    ScrollingExecutionMode(ESQuery query) {
+      super(query);
+      jsonQuery = (JsonObject) new JsonParser().parse(query.getQuery());
+    }
+
+    ESQuery modify(int offset, int limit) {
+      jsonQuery.addProperty(ESDriverConfig.FROM, offset);
+      jsonQuery.addProperty(ESDriverConfig.SIZE, limit);
+      return new ESQuery(
+        esQuery.getIndex(),
+        esQuery.getType(),
+        jsonQuery.toString(),
+        ImmutableList.copyOf(esQuery.getSchema()),
+        ImmutableList.copyOf(esQuery.getColumns()),
+        esQuery.getQueryType(),
+        esQuery.getLimit()
+      );
+    }
+
+
+    @Override
+    ESResultSet executeInternal() throws ESClientException{
+      final ESResultSet resultSet = executeImpl(esQuery);
+      final int fetchSize = esDriverConfig.getTermFetchSize();
+      final int limit = esQuery.getLimit();
+      return new ESResultSet(
+        limit,
+        new Iterable<ResultRow>() {
+          ESResultSet batch = resultSet;
+          int processed = 0;
+
+          final ESResultSet getNextBatch() throws ESClientException {
+            final int toProcess = limit - processed;
+            final int newFetchSize = limit == -1 || fetchSize < toProcess
+              ?
+              fetchSize
+              :
+              toProcess;
+            batch = executeImpl(modify(processed, newFetchSize));
+            return batch;
+          }
+
+
+          @Override
+          public Iterator<ResultRow> iterator() {
+            return new Iterator<ResultRow>() {
+              @Override
+              public boolean hasNext() {
+                try {
+                  return processed < limit
+                    && (batch.hasNext() || getNextBatch().size > 0);
+                } catch (ESClientException e) {
+                  throw new RuntimeException("Encountered a runtime issue during execution", e);
+                }
+              }
+
+              @Override
+              public ResultRow next() {
+                if (!hasNext()) {
+                  throw new NoSuchElementException("Processed : " + processed + ", Limit : " + limit);
+                }
+                final ResultRow nextRow = batch.next();
+                processed++;
+                return nextRow;
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException("Cannot remove from es resultset!");
+              }
+            };
+          }
+        },
+        resultSet.getMetadata()
+      );
+    }
+  }
+
+  private class DefaultExecutionMode extends ExecutionMode {
+
+    DefaultExecutionMode(ESQuery query) {
+      super(query);
+    }
+
+    @Override
+    ESResultSet executeInternal() throws ESClientException {
+      return executeImpl(esQuery);
+    }
+
+  }
+
+  public ESClient(ESDriverConfig esDriverConfig, Configuration conf) {
+    this.esDriverConfig = esDriverConfig;
+  }
+
+  public final ESResultSet execute(final ESQuery esQuery) throws ESClientException {
+    return getExecutionModeFor(esQuery).executeInternal();
+  }
+
+  public abstract String explain(ESQuery esQuery) throws ESClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
new file mode 100644
index 0000000..4ba2321
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client;
+
+import java.util.Iterator;
+
+import org.apache.lens.api.query.ResultRow;
+import org.apache.lens.server.api.driver.InMemoryResultSet;
+import org.apache.lens.server.api.driver.LensResultSetMetadata;
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.NonNull;
+
+/**
+ * The class ESResultset for iterating over elastic search result set
+ */
+public class ESResultSet extends InMemoryResultSet {
+
+  @NonNull
+  final Iterator<ResultRow> resultSetIterator;
+  @NonNull
+  final LensResultSetMetadata resultSetMetadata;
+  final int size;
+
+  public ESResultSet(int size, final Iterable<ResultRow> resultSetIterable, final LensResultSetMetadata metadata) {
+    this.size = size;
+    this.resultSetIterator = resultSetIterable.iterator();
+    this.resultSetMetadata = metadata;
+  }
+
+  @Override
+  public boolean hasNext(){
+    return resultSetIterator.hasNext();
+  }
+
+  @Override
+  public ResultRow next() {
+    return resultSetIterator.next();
+  }
+
+  @Override
+  public void setFetchSize(int i) {
+
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public LensResultSetMetadata getMetadata() {
+    return resultSetMetadata;
+  }
+
+  @Override
+  public boolean seekToStart() throws LensException {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java
new file mode 100644
index 0000000..35dc070
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestClientImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client.jest;
+
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.client.ESClient;
+import org.apache.lens.driver.es.client.ESResultSet;
+import org.apache.lens.driver.es.exceptions.ESClientException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestClientFactory;
+import io.searchbox.client.config.HttpClientConfig;
+import io.searchbox.core.Explain;
+import io.searchbox.core.Search;
+import io.searchbox.core.SearchResult;
+import lombok.NonNull;
+
+/**
+ * The ESRestClient for firing queries on elastic search
+ */
+public class JestClientImpl extends ESClient {
+
+  private static final int DEFAULT_MAX_CONN = 10;
+  private static final boolean DEFAULT_MULTI_THREADED = true;
+  private static final String IS_MULTITHREADED = "lens.driver.es.jest.is.multi.threaded";
+  private static final String MAX_TOTAL_CONN = "lens.driver.es.jest.max.conn";
+  private static final String ES_SERVERS = "lens.driver.es.jest.servers";
+
+  @NonNull
+  private final JestClient client;
+
+  public JestClientImpl(ESDriverConfig esDriverConfig, Configuration conf) {
+    super(esDriverConfig, conf);
+    final JestClientFactory factory = new JestClientFactory();
+    factory.setHttpClientConfig(new HttpClientConfig
+      .Builder(Validate.notNull(conf.getStringCollection(ES_SERVERS)))
+      .maxTotalConnection(conf.getInt(MAX_TOTAL_CONN, DEFAULT_MAX_CONN))
+      .multiThreaded(conf.getBoolean(IS_MULTITHREADED, DEFAULT_MULTI_THREADED))
+      .readTimeout(esDriverConfig.getQueryTimeOutMs())
+      .build());
+    client = factory.getObject();
+  }
+
+  @Override
+  public ESResultSet executeImpl(ESQuery esQuery) throws ESClientException {
+    try {
+      final Search search = new Search.Builder(esQuery.getQuery())
+        .addIndex(esQuery.getIndex())
+        .addType(esQuery.getType())
+        .build();
+      final SearchResult result = client.execute(search);
+      if (result == null) {
+        throw new NullPointerException("Got null result from client for " + esQuery);
+      }
+      return JestResultSetTransformer.transformFrom(
+        result.getJsonObject(), esQuery.getSchema(), esQuery.getColumns());
+    } catch (Exception e) {
+      throw new ESClientException("Execution failed, ", e);
+    }
+  }
+
+  public String explain(ESQuery esQuery) throws ESClientException {
+    try {
+      return client
+        .execute(new Explain
+          .Builder(esQuery.getIndex(), esQuery.getType(), null, esQuery.getQuery())
+          .build())
+        .getJsonString();
+    } catch (Exception e) {
+      throw new ESClientException("Explanation failed, ", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java
new file mode 100644
index 0000000..af313da
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/jest/JestResultSetTransformer.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.client.jest;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lens.api.query.ResultRow;
+import org.apache.lens.driver.es.client.ESResultSet;
+import org.apache.lens.server.api.driver.LensResultSetMetadata;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hive.service.cli.ColumnDescriptor;
+import org.apache.hive.service.cli.Type;
+import org.apache.hive.service.cli.TypeDescriptor;
+
+import com.google.common.collect.Lists;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import lombok.NonNull;
+
+/**
+ * The class ESResultSetTransformer, transforms json responses of elastic search
+ * to 2D result sets that lens expects
+ */
+public abstract class JestResultSetTransformer {
+
+  /**
+   * The class AggregateTransformer, takes care of transforming results of
+   * aggregate queries
+   */
+  static class AggregateTransformer extends JestResultSetTransformer {
+
+    private List<ResultRow> rows = Lists.newArrayList();
+
+    public AggregateTransformer(JsonObject result, List<String> schema, List<String> selectedColumns) {
+      super(result, schema, selectedColumns);
+    }
+
+    /**
+     * The json response from jest will be nested in case of group bys
+     *  g1-val           (group 1)
+     *  |      \
+     *  g2-val1  g2-val2 (group 2)
+     *  |        |
+     *  5, 10    10, 15  (metrics)
+     *
+     *  The method will traverse the tree and collect all the paths ->
+     *  keyCol1 keyCol2  aggr1 aggr2
+     *  g1-val  g2-val1  5     10
+     *  g1-val  g2-val2  10    15
+     *
+     * @param object, json response of jest
+     * @param currentPath, current row
+     * @param length, length of the current row
+     * @param keyCol, Col name of the keys that will be parsed in the recursion
+     */
+    private void collectAllRows(JsonObject object, List<Object> currentPath, int length, String keyCol) {
+      for (Map.Entry<String, JsonElement> entry : object.entrySet()) {
+        JsonElement element = entry.getValue();
+        final String key = entry.getKey();
+        if (key.equals(ResultSetConstants.KEY_STRING)) {
+          Validate.isTrue(keyCol != null, "Key not available");
+          currentPath.set(schema.indexOf(keyCol), entry.getValue().getAsString());
+          length++;
+          if (length == schema.size()) {
+            rows.add(new ResultRow(Lists.newArrayList(currentPath)));
+          }
+        } else if (element instanceof JsonObject && ((JsonObject) element).get(ResultSetConstants.VALUE_KEY) != null) {
+          currentPath.set(schema.indexOf(key),
+            ((JsonObject) element).get(ResultSetConstants.VALUE_KEY).getAsString());
+          length++;
+          if (length == schema.size()) {
+            rows.add(new ResultRow(Lists.newArrayList(currentPath)));
+          }
+        } else if (element instanceof JsonObject) {
+          collectAllRows((JsonObject) element, Lists.newArrayList(currentPath), length, key);
+        } else if (element instanceof JsonArray && key.equals(ResultSetConstants.BUCKETS_KEY)) {
+          JsonArray array = (JsonArray) element;
+          for (JsonElement arrayElement : array) {
+            collectAllRows((JsonObject) arrayElement, Lists.newArrayList(currentPath), length, keyCol);
+          }
+        }
+      }
+    }
+
+    @Override
+    public ESResultSet transform() {
+      collectAllRows(
+        result.getAsJsonObject(ResultSetConstants.AGGREGATIONS_KEY)
+          .getAsJsonObject(ResultSetConstants.FILTER_WRAPPER_KEY),
+        getEmptyRow(),
+        0,
+        null
+      );
+      return new ESResultSet(
+        rows.size(),
+        rows,
+        getMetaData(schema)
+      );
+    }
+  }
+
+  /**
+   * The class TermTransformer, takes care of transforming results of simple select queries
+   */
+  static class TermTransformer extends JestResultSetTransformer {
+
+    public TermTransformer(JsonObject result, List<String> schema, List<String> selectedColumns) {
+      super(result, schema, selectedColumns);
+    }
+
+    @Override
+    public ESResultSet transform() {
+
+      JsonArray jsonArray = result
+        .getAsJsonObject(ResultSetConstants.HITS_KEY)
+        .getAsJsonArray(ResultSetConstants.HITS_KEY);
+
+      final List<ResultRow> rows = Lists.newArrayList();
+      for (JsonElement element : jsonArray) {
+        final List<Object> objects = getEmptyRow();
+        for (Map.Entry<String, JsonElement> entry
+          : element
+            .getAsJsonObject()
+            .getAsJsonObject(ResultSetConstants.FIELDS_KEY)
+            .entrySet()) {
+          objects.set(
+            selectedColumns.indexOf(entry.getKey())
+            , entry.getValue().getAsString()
+          );
+        }
+        rows.add(new ResultRow(objects));
+      }
+      return new ESResultSet(rows.size(), rows, getMetaData(schema));
+    }
+
+
+  }
+
+  @NonNull
+  protected final JsonObject result;
+  @NonNull
+  protected final List<String> schema;
+  @NonNull
+  protected final List<String> selectedColumns;
+
+  public JestResultSetTransformer(JsonObject result, List<String> schema, List<String> selectedColumns) {
+    this.schema = schema;
+    this.result = result;
+    this.selectedColumns = selectedColumns;
+  }
+
+  public static ESResultSet transformFrom(JsonObject jsonResult, List<String> schema, List<String> selectedColumns) {
+    if (jsonResult.getAsJsonObject(ResultSetConstants.AGGREGATIONS_KEY) != null) {
+      return new AggregateTransformer(jsonResult, schema, selectedColumns).transform();
+    } else {
+      return new TermTransformer(jsonResult, schema, selectedColumns).transform();
+    }
+  }
+
+  protected List<Object> getEmptyRow() {
+    List<Object> objects = Lists.newArrayList();
+    int i = 0;
+    while (i++ < schema.size()) {
+      objects.add(null);
+    }
+    return objects;
+  }
+
+  public abstract ESResultSet transform();
+
+  protected LensResultSetMetadata getMetaData(final List<String> schema) {
+    return new LensResultSetMetadata() {
+      @Override
+      public List<ColumnDescriptor> getColumns() {
+        List<ColumnDescriptor> descriptors = Lists.newArrayList();
+        int i = 0;
+        for (final String col : schema) {
+          descriptors.add(
+            new ColumnDescriptor(col, col, new TypeDescriptor(Type.STRING_TYPE), i++)
+          );
+        }
+        return descriptors;
+      }
+    };
+  }
+
+  protected static class ResultSetConstants {
+    public static final String AGGREGATIONS_KEY = "aggregations";
+    public static final String KEY_STRING = "key";
+    public static final String VALUE_KEY = "value";
+    public static final String BUCKETS_KEY = "buckets";
+    public static final String FILTER_WRAPPER_KEY = "filter_wrapper";
+    public static final String HITS_KEY = "hits";
+    public static final String FIELDS_KEY = "fields";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java
new file mode 100644
index 0000000..c2254f1
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/ESClientException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.exceptions;
+
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.NonNull;
+
+public class ESClientException extends LensException {
+  public ESClientException(String s, Exception e) {
+    super(s, e);
+  }
+
+  public ESClientException(String errorMsg) {
+    super(errorMsg);
+  }
+
+  public ESClientException(String errorMsg, Throwable cause) {
+    super(errorMsg, cause);
+  }
+
+  public ESClientException() {
+  }
+
+  public ESClientException(Throwable cause) {
+    super(cause);
+  }
+
+  public ESClientException(int errorCode) {
+    super(errorCode);
+  }
+
+  public ESClientException(String errorMsg, int errorCode) {
+    super(errorMsg, errorCode);
+  }
+
+  public ESClientException(int errorCode, Throwable cause,
+                           @NonNull Object... errorMsgFormattingArgs) {
+    super(errorCode, cause, errorMsgFormattingArgs);
+  }
+
+  public ESClientException(String errorMsg, int errorcode, Throwable cause,
+                           @NonNull Object... errorMsgFormattingArgs) {
+    super(errorMsg, errorcode, cause, errorMsgFormattingArgs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java
new file mode 100644
index 0000000..8127cba
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/exceptions/InvalidQueryException.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.exceptions;
+
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.NonNull;
+
+public class InvalidQueryException extends LensException {
+  static final String BASE_EXCEPTION_MESSAGE = "AST traversal failed!";
+
+  public InvalidQueryException(String s) {
+    super(BASE_EXCEPTION_MESSAGE + s);
+  }
+
+  public InvalidQueryException(String errorMsg, Throwable cause) {
+    super(errorMsg, cause);
+  }
+
+  public InvalidQueryException() {
+  }
+
+  public InvalidQueryException(Throwable cause) {
+    super(cause);
+  }
+
+  public InvalidQueryException(int errorCode) {
+    super(errorCode);
+  }
+
+  public InvalidQueryException(String errorMsg, int errorCode) {
+    super(errorMsg, errorCode);
+  }
+
+  public InvalidQueryException(int errorCode, Throwable cause,
+                               @NonNull Object... errorMsgFormattingArgs) {
+    super(errorCode, cause, errorMsgFormattingArgs);
+  }
+
+  public InvalidQueryException(String errorMsg, int errorcode, Throwable cause,
+                               @NonNull Object... errorMsgFormattingArgs) {
+    super(errorMsg, errorcode, cause, errorMsgFormattingArgs);
+  }
+
+  public InvalidQueryException(String s, Exception e) {
+    super(BASE_EXCEPTION_MESSAGE + s, e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java
new file mode 100644
index 0000000..a0bd7c8
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTCriteriaVisitor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+import java.util.List;
+
+import org.apache.lens.driver.es.exceptions.InvalidQueryException;
+
+public interface ASTCriteriaVisitor {
+  /**
+   *
+   * @param logicalOp The unary operator
+   * @param visitedSubTree The visited subtree of unary operator
+   */
+  void visitUnaryLogicalOp(String logicalOp, ASTCriteriaVisitor visitedSubTree) throws InvalidQueryException;
+
+  /**
+   *
+   * @param logicalOp The logical operator
+   * @param visitedSubTrees The visited subtree of logical operator
+   */
+  void visitLogicalOp(String logicalOp, List<ASTCriteriaVisitor> visitedSubTrees) throws InvalidQueryException;
+
+  /**
+   * Visits simple predicate
+   *
+   * @param conditionalOp Operator constructing the predicate
+   * @param leftCol Left column
+   * @param rightExps Right expression
+   */
+  void visitPredicate(String conditionalOp, String leftCol, List<String> rightExps) throws InvalidQueryException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java
new file mode 100644
index 0000000..f654a60
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ASTVisitor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+/**
+ * The visitor interface for ASTInorderTraversal
+ * It covers only simple queries (no joins or sub queries)
+ * Also it does not handle complicated expressions
+ */
+public interface ASTVisitor {
+
+  enum OrderBy {ASC, DESC}
+
+  void visitSimpleSelect(String columnName, String alias);
+
+  void visitAggregation(String aggregationType, String columnName, String alias);
+
+  void visitFrom(String database, String table);
+
+  void visitCriteria(ASTCriteriaVisitor visitedSubTree);
+
+  void visitGroupBy(String colName);
+
+  void visitOrderBy(String colName, OrderBy orderBy);
+
+  void visitLimit(int limit);
+
+  void completeVisit();
+
+  void visitAllCols();
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java
new file mode 100644
index 0000000..92ec10f
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/CriteriaVisitorFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+/**
+ * The interface CriteriaVisitorFactory that produces instances of type ASTCriteriaVisitor
+ */
+public interface CriteriaVisitorFactory {
+  ASTCriteriaVisitor getInstance();
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java
new file mode 100644
index 0000000..441f6d6
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/ESVisitor.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator;
+
+import static org.apache.lens.driver.es.ESDriverConfig.MATCH_ALL;
+
+import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
+
+import java.util.List;
+
+import org.apache.lens.cube.parse.HQLParser;
+import org.apache.lens.driver.es.ASTTraverserForES;
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.translator.impl.ESAggregateVisitor;
+import org.apache.lens.driver.es.translator.impl.ESCriteriaVisitor;
+import org.apache.lens.driver.es.translator.impl.ESCriteriaVisitorFactory;
+import org.apache.lens.driver.es.translator.impl.ESTermVisitor;
+import org.apache.lens.server.api.error.LensException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+
+/**
+ * The abstract visitor for constructing elastic search queries from ASTNode
+ */
+public abstract class ESVisitor implements ASTVisitor {
+  protected static final JsonNodeFactory JSON_NODE_FACTORY = JsonNodeFactory.instance;
+
+  protected final ObjectNode queryNode = JSON_NODE_FACTORY.objectNode();
+  protected final List<String> querySchema = Lists.newArrayList();
+  protected final List<String> selectedColumnNames = Lists.newArrayList();
+  @NonNull
+  protected final ESDriverConfig config;
+  protected ESQuery.QueryType queryType;
+
+  protected String index;
+  protected String type;
+  protected int limit;
+  protected JsonNode criteriaNode = getMatchAllNode();
+
+  protected ESVisitor(ESDriverConfig config) {
+    this.config = config;
+    limit = config.getMaxLimit();
+  }
+
+  private static JsonNode getMatchAllNode() {
+    final ObjectNode matchAllNode = JSON_NODE_FACTORY.objectNode();
+    matchAllNode.put(MATCH_ALL, JSON_NODE_FACTORY.objectNode());
+    return matchAllNode;
+  }
+
+  public static ESQuery rewrite(ESDriverConfig config, String hql) throws LensException {
+    ASTNode rootQueryNode;
+    try {
+      rootQueryNode = HQLParser.parseHQL(hql, new HiveConf());
+    } catch (Exception e) {
+      throw new ESRewriteException(e);
+    }
+    return rewrite(config, rootQueryNode);
+  }
+
+  public static ESQuery rewrite(ESDriverConfig config, ASTNode rootQueryNode) throws LensException {
+    final ESVisitor visitor = isAggregateQuery(rootQueryNode)
+      ?
+      new ESAggregateVisitor(config)
+      :
+      new ESTermVisitor(config);
+
+    new ASTTraverserForES(
+      rootQueryNode,
+      visitor,
+      new ESCriteriaVisitorFactory()
+    ).accept();
+
+    return visitor.getQuery();
+  }
+
+  /**
+   * Have to move them to proper util classes
+   * @param rootQueryNode, root node of AST
+   * @return if the query is aggregate
+   */
+  private static boolean isAggregateQuery(ASTNode rootQueryNode) {
+    return hasGroupBy(rootQueryNode) || areAllColumnsAggregate(rootQueryNode);
+  }
+
+  /**
+   * Have to move them to proper util functions
+   * @param rootQueryNode, root node of AST
+   * @return if all Cols are aggregate
+   */
+  private static boolean areAllColumnsAggregate(ASTNode rootQueryNode) {
+    boolean areAllColumnsAggregate = true;
+    final ASTNode selectNode = HQLParser.findNodeByPath(rootQueryNode, TOK_INSERT, TOK_SELECT);
+    for (Node selectExp : selectNode.getChildren()) {
+      final Node innerNode = selectExp.getChildren().get(0);
+      if (!innerNode.getName().equals(String.valueOf(TOK_FUNCTION))) {
+        areAllColumnsAggregate = false;
+        break;
+      }
+    }
+    return areAllColumnsAggregate;
+  }
+
+  private static boolean hasGroupBy(ASTNode rootQueryNode) {
+    return HQLParser.findNodeByPath(rootQueryNode, TOK_INSERT, TOK_GROUPBY) != null;
+  }
+
+  protected static String visitColumn(String cannonicalColName) {
+    final String[] colParts = cannonicalColName.split("\\.");
+    return colParts[colParts.length - 1];
+  }
+
+  @Override
+  public void visitFrom(String database, String table) {
+    Validate.notNull(database);
+    Validate.notNull(table);
+    Preconditions.checkNotNull(table);
+    index = database;
+    type = table;
+  }
+
+  @Override
+  public void visitLimit(int limit) {
+    this.limit = limit;
+  }
+
+  @Override
+  public void visitCriteria(ASTCriteriaVisitor visitedSubTree) {
+    criteriaNode = ((ESCriteriaVisitor) visitedSubTree).getNode();
+  }
+
+  @Override
+  public void visitAllCols() {
+    throw new UnsupportedOperationException("'*' is not supported in elastic search, select the columns required");
+  }
+
+  public ESQuery getQuery() {
+    Validate.isTrue(querySchema.size() == selectedColumnNames.size(),
+      "Query building seems to have gone wrong... schema and alias list size seems to be different");
+    return new ESQuery(index, type, queryNode.toString(), ImmutableList.copyOf(querySchema),
+      ImmutableList.copyOf(selectedColumnNames), queryType, limit);
+  }
+
+  private static class ESRewriteException extends RuntimeException {
+    public ESRewriteException(Exception e) {
+      super(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/bffa78c9/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java
----------------------------------------------------------------------
diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java
new file mode 100644
index 0000000..69f20d3
--- /dev/null
+++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/translator/impl/ESAggregateVisitor.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.driver.es.translator.impl;
+
+import java.util.Map;
+
+import org.apache.lens.driver.es.ESDriverConfig;
+import org.apache.lens.driver.es.ESQuery;
+import org.apache.lens.driver.es.translator.ESVisitor;
+
+import org.apache.commons.lang3.Validate;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+
+/**
+ * Visitor for traversing aggregate elastic search queries
+ * that involve group by or simple aggregations
+ */
+public class ESAggregateVisitor extends ESVisitor {
+
+  private final ObjectNode groupByNode = JSON_NODE_FACTORY.objectNode();
+  private final ObjectNode aggNode = JSON_NODE_FACTORY.objectNode();
+  private ObjectNode currentGroupByNode = groupByNode;
+  private Map<String, String> groupByKeys = Maps.newHashMap();
+
+  public ESAggregateVisitor(ESDriverConfig config) {
+    super(config);
+    queryType = ESQuery.QueryType.AGGR;
+  }
+
+  @Override
+  public void visitSimpleSelect(String columnName, String alias) {
+    columnName = visitColumn(columnName);
+    final String aliasName = alias == null ? columnName : alias;
+    Validate.isTrue(!querySchema.contains(aliasName), "Ambiguous alias '" + aliasName + "'");
+    querySchema.add(aliasName);
+    selectedColumnNames.add(columnName);
+    groupByKeys.put(columnName, aliasName);
+  }
+
+  @Override
+  public void visitAggregation(String aggregationType, String columnName, String alias) {
+    columnName = visitColumn(columnName);
+    final String aliasName = alias == null ? columnName : alias;
+    querySchema.add(aliasName);
+    selectedColumnNames.add(columnName);
+    final ObjectNode aggMeasures = JSON_NODE_FACTORY.objectNode();
+    final ObjectNode fieldNode = JSON_NODE_FACTORY.objectNode();
+    fieldNode.put(ESDriverConfig.FIELD, columnName);
+    aggMeasures.put(ESDriverConfig.AGGREGATIONS.get(aggregationType), fieldNode);
+    aggNode.put(aliasName, aggMeasures);
+  }
+
+  @Override
+  public void visitGroupBy(String groupBy) {
+    groupBy = visitColumn(groupBy);
+    final ObjectNode aggNode = JSON_NODE_FACTORY.objectNode();
+    currentGroupByNode.put(ESDriverConfig.AGGS, aggNode);
+    final ObjectNode groupByNode = JSON_NODE_FACTORY.objectNode();
+    aggNode.put(
+      Validate.notNull(groupByKeys.get(groupBy), "Group by column has to be used in select")
+      , groupByNode);
+    final ObjectNode termsNode = JSON_NODE_FACTORY.objectNode();
+    groupByNode.put(ESDriverConfig.TERMS, termsNode);
+    termsNode.put(ESDriverConfig.FIELD, groupBy);
+    termsNode.put(ESDriverConfig.SIZE, config.getAggrBucketSize());
+    currentGroupByNode = groupByNode;
+  }
+
+  @Override
+  public void visitOrderBy(String colName, OrderBy orderBy) {
+    /**
+     * TODO, the feature is partially available in elasticsearch.
+     * http://tinyurl.com/p6e5upo
+     */
+    throw new UnsupportedOperationException("Order by cannot be used with aggregate queries");
+  }
+
+  @Override
+  public void completeVisit() {
+    queryNode.put(ESDriverConfig.SIZE, ESDriverConfig.AGGR_TERM_FETCH_SIZE);
+    queryNode.put(ESDriverConfig.QUERY_TIME_OUT_STRING, config.getQueryTimeOutMs());
+    final ObjectNode outerAggsNode = JSON_NODE_FACTORY.objectNode();
+    queryNode.put(ESDriverConfig.AGGS, outerAggsNode);
+    outerAggsNode.put(ESDriverConfig.FILTER_WRAPPER, groupByNode);
+    groupByNode.put(ESDriverConfig.FILTER, criteriaNode);
+    currentGroupByNode.put(ESDriverConfig.AGGS, aggNode);
+  }
+}