You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/07 00:56:08 UTC

[1/5] impala git commit: IMPALA-7258: Support querying HBase tables in LocalCatalog

Repository: impala
Updated Branches:
  refs/heads/master a0673c058 -> df3f16584


http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
new file mode 100644
index 0000000..4ad2c14
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeCatalogUtils;
+import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Set;
+
+public class LocalHbaseTable extends LocalTable implements FeHBaseTable {
+  // Name of table in HBase.
+  // 'this.name' is the alias of the HBase table in Hive.
+  // It's also referenced in msTbl so it doesn't take additional space here
+  private String hbaseTableName_;
+
+  // Cached column families. Used primarily for speeding up row stats estimation
+  // (see IMPALA-4211).
+  // TODO: revisit after caching is implemented for local catalog
+  private HColumnDescriptor[] columnFamilies_ = null;
+
+  private LocalHbaseTable(LocalDb db, Table msTbl, ColumnMap cols) {
+    super(db, msTbl, cols);
+    hbaseTableName_ = Util.getHBaseTableName(msTbl);
+  }
+
+  static LocalHbaseTable loadFromHbase(LocalDb db, Table msTable) {
+    try {
+      // Warm up the connection and verify the table exists.
+      Util.getHBaseTable(Util.getHBaseTableName(msTable)).close();
+      // since we don't support composite hbase rowkeys yet, all hbase tables have a
+      // single clustering col
+      return new LocalHbaseTable(db, msTable, new ColumnMap(Util.loadColumns(msTable), 1,
+          msTable.getDbName() + "." + msTable.getTableName()));
+    } catch (IOException | MetaException | SerDeException e) {
+      throw new LocalCatalogException(e);
+    }
+  }
+
+  @Override
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
+    TTableDescriptor tableDescriptor =
+        new TTableDescriptor(tableId, TTableType.HBASE_TABLE,
+            FeCatalogUtils.getTColumnDescriptors(this), 1, getHBaseTableName(),
+            db_.getName());
+    tableDescriptor.setHbaseTable(Util.getTHBaseTable(this));
+    return tableDescriptor;
+  }
+
+  @Override
+  public Pair<Long, Long> getEstimatedRowStats(byte[] startRowKey, byte[] endRowKey) {
+    return Util.getEstimatedRowStats(this, startRowKey, endRowKey);
+  }
+
+  @Override
+  public String getHBaseTableName() {
+    return hbaseTableName_;
+  }
+
+  @Override
+  public TResultSet getTableStats() {
+    return Util.getTableStats(this);
+  }
+
+  @Override
+  public HColumnDescriptor[] getColumnFamilies() throws IOException {
+    if (columnFamilies_ == null) {
+      try (org.apache.hadoop.hbase.client.Table hBaseTable = Util
+          .getHBaseTable(getHBaseTableName())) {
+        columnFamilies_ = hBaseTable.getTableDescriptor().getColumnFamilies();
+      }
+    }
+    return columnFamilies_;
+  }
+
+  @Override
+  public ArrayList<Column> getColumnsInHiveOrder() {
+    return getColumns();
+  }
+
+  /**
+   * Returns the storage handler class for HBase tables read by Hive.
+   */
+  @Override
+  public String getStorageHandlerClassName() {
+    return Util.HBASE_STORAGE_HANDLER;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 57da5f2..dd843ae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -74,9 +74,8 @@ abstract class LocalTable implements FeTable {
     if (TableType.valueOf(msTbl.getTableType()) == TableType.VIRTUAL_VIEW) {
       t = new LocalView(db, msTbl);
     } else if (HBaseTable.isHBaseTable(msTbl)) {
-      // TODO(todd) support HBase table
+      t = LocalHbaseTable.loadFromHbase(db, msTbl);
     } else if (KuduTable.isKuduTable(msTbl)) {
-      // TODO(todd) support kudu table
       t = LocalKuduTable.loadFromKudu(db, msTbl);
     } else if (DataSourceTable.isDataSourceTable(msTbl)) {
       // TODO(todd) support datasource table

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 5184f96..c35bc7a 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -35,8 +35,8 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.HBaseColumn;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
@@ -192,7 +192,7 @@ public class HBaseScanNode extends ScanNode {
   @Override
   public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
+    FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
 
     ValueRange rowRange = keyRanges_.get(0);
     if (isEmpty_) {
@@ -229,7 +229,7 @@ public class HBaseScanNode extends ScanNode {
 
   @Override
   protected String debugString() {
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
+    FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
     return Objects.toStringHelper(this)
         .add("tid", desc_.getId().asInt())
         .add("hiveTblName", tbl.getFullName())
@@ -278,7 +278,7 @@ public class HBaseScanNode extends ScanNode {
   @Override
   protected void toThrift(TPlanNode msg) {
     msg.node_type = TPlanNodeType.HBASE_SCAN_NODE;
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
+    FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
     msg.hbase_scan_node =
       new THBaseScanNode(desc_.getId().asInt(), tbl.getHBaseTableName());
     if (!filters_.isEmpty()) {
@@ -299,13 +299,10 @@ public class HBaseScanNode extends ScanNode {
     if (isEmpty_) return;
 
     // Retrieve relevant HBase regions and their region servers
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
-    org.apache.hadoop.hbase.client.Table hbaseTbl = null;
+    FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
     List<HRegionLocation> regionsLoc;
     try {
-      hbaseTbl = tbl.getHBaseTable();
-      regionsLoc = HBaseTable.getRegionsInRange(hbaseTbl, startKey_, stopKey_);
-      hbaseTbl.close();
+      regionsLoc = FeHBaseTable.Util.getRegionsInRange(tbl, startKey_, stopKey_);
     } catch (IOException e) {
       throw new RuntimeException(
           "couldn't retrieve HBase table (" + tbl.getHBaseTableName() + ") info:\n"
@@ -403,7 +400,7 @@ public class HBaseScanNode extends ScanNode {
   @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
-    HBaseTable table = (HBaseTable) desc_.getTable();
+    FeHBaseTable table = (FeHBaseTable) desc_.getTable();
     StringBuilder output = new StringBuilder();
     if (isEmpty_) {
       output.append(prefix + "empty scan node\n");

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 9948840..1496d9a 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -32,9 +32,9 @@ import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.analysis.TupleId;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
@@ -200,7 +200,7 @@ public class Planner {
             graph.addTargetColumnLabels(targetTable);
           }
           exprs.addAll(resultExprs);
-        } else if (targetTable instanceof HBaseTable) {
+        } else if (targetTable instanceof FeHBaseTable) {
           graph.addTargetColumnLabels(targetTable);
           exprs.addAll(resultExprs);
         } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 45e2351..6b3f032 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -56,9 +56,9 @@ import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
@@ -1308,7 +1308,7 @@ public class SingleNodePlanner {
           conjuncts);
       scanNode.init(analyzer);
       return scanNode;
-    } else if (table instanceof HBaseTable) {
+    } else if (table instanceof FeHBaseTable) {
       // HBase table
       scanNode = new HBaseScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
     } else if (tblRef.getTable() instanceof FeKuduTable) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 214c9b4..12cc4b0 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -21,10 +21,9 @@ import java.util.List;
 
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.thrift.TSinkAction;
 
 import com.google.common.base.Preconditions;
@@ -104,7 +103,7 @@ public abstract class TableSink extends DataSink {
       Preconditions.checkState(referencedColumns.isEmpty());
       return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered,
           sortColumns);
-    } else if (table instanceof HBaseTable) {
+    } else if (table instanceof FeHBaseTable) {
       // HBase only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
       // Partition clause doesn't make sense for an HBase table.

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 1ff9d20..477d130 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -81,10 +81,10 @@ import org.apache.impala.catalog.FeDataSource;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -106,7 +106,6 @@ import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TCommentOnParams;
 import org.apache.impala.thrift.TCreateDropRoleParams;
 import org.apache.impala.thrift.TDdlExecRequest;
-import org.apache.impala.thrift.TDdlExecResponse;
 import org.apache.impala.thrift.TDdlType;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeResult;
@@ -746,8 +745,8 @@ public class Frontend {
     FeTable table = getCatalog().getTable(dbName, tableName);
     if (table instanceof FeFsTable) {
       return ((FeFsTable) table).getTableStats();
-    } else if (table instanceof HBaseTable) {
-      return ((HBaseTable) table).getTableStats();
+    } else if (table instanceof FeHBaseTable) {
+      return ((FeHBaseTable) table).getTableStats();
     } else if (table instanceof FeDataSourceTable) {
       return ((FeDataSourceTable) table).getTableStats();
     } else if (table instanceof FeKuduTable) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 30dfc5b..93ff5af 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -211,4 +211,32 @@ public class LocalCatalogTest {
         "STORED AS KUDU\n" +
         "TBLPROPERTIES"));
   }
+
+  @Test
+  public void testHbaseTable() throws Exception {
+    LocalHbaseTable t = (LocalHbaseTable) catalog_.getTable("functional_hbase",
+        "alltypes");
+    Assert.assertThat(ToSqlUtils.getCreateTableSql(t), CoreMatchers.startsWith(
+        "CREATE EXTERNAL TABLE functional_hbase.alltypes (\n" +
+        "  id INT COMMENT 'Add a comment',\n" +
+        "  bigint_col BIGINT,\n" +
+        "  bool_col BOOLEAN,\n" +
+        "  date_string_col STRING,\n" +
+        "  double_col DOUBLE,\n" +
+        "  float_col FLOAT,\n" +
+        "  int_col INT,\n" +
+        "  month INT,\n" +
+        "  smallint_col SMALLINT,\n" +
+        "  string_col STRING,\n" +
+        "  timestamp_col TIMESTAMP,\n" +
+        "  tinyint_col TINYINT,\n" +
+        "  year INT\n" +
+        ")\n" +
+        "STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'\n" +
+        "WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,d:bool_col,d:tinyint_col," +
+        "d:smallint_col,d:int_col,d:bigint_col,d:float_col,d:double_col," +
+        "d:date_string_col,d:string_col,d:timestamp_col,d:year,d:month', " +
+        "'serialization.format'='1')"
+    ));
+  }
 }


[2/5] impala git commit: IMPALA-7258: Support querying HBase tables in LocalCatalog

Posted by ta...@apache.org.
IMPALA-7258: Support querying HBase tables in LocalCatalog

This is a straightforward port expecting no behavior change. All of
the HBase E2E tests pass with this patch.

Change-Id: I8cc94bc38861443de5a375b7e63d29215e0ca899
Reviewed-on: http://gerrit.cloudera.org:8080/11079
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e6bf4dc2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e6bf4dc2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e6bf4dc2

Branch: refs/heads/master
Commit: e6bf4dc2a1c6205b9ae5d921702340ac0089370e
Parents: a0673c0
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Wed Jul 18 18:28:04 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Aug 6 22:05:15 2018 +0000

----------------------------------------------------------------------
 .../analysis/AlterTableAddReplaceColsStmt.java  |   4 +-
 .../impala/analysis/AlterTableAlterColStmt.java |   4 +-
 .../impala/analysis/AlterTableDropColStmt.java  |   4 +-
 .../impala/analysis/AlterTableSetStmt.java      |   4 +-
 .../analysis/AlterTableSetTblProperties.java    |   4 +-
 .../impala/analysis/AlterTableSortByStmt.java   |   4 +-
 .../org/apache/impala/analysis/Analyzer.java    |   4 +-
 .../impala/analysis/ComputeStatsStmt.java       |   5 +-
 .../org/apache/impala/analysis/InsertStmt.java  |  15 +-
 .../org/apache/impala/analysis/ToSqlUtils.java  |   4 +-
 .../org/apache/impala/catalog/FeHBaseTable.java | 676 ++++++++++++++++
 .../org/apache/impala/catalog/HBaseTable.java   | 762 ++-----------------
 .../impala/catalog/local/LocalHbaseTable.java   | 115 +++
 .../apache/impala/catalog/local/LocalTable.java |   3 +-
 .../apache/impala/planner/HBaseScanNode.java    |  17 +-
 .../java/org/apache/impala/planner/Planner.java |   4 +-
 .../impala/planner/SingleNodePlanner.java       |   4 +-
 .../org/apache/impala/planner/TableSink.java    |   5 +-
 .../org/apache/impala/service/Frontend.java     |   7 +-
 .../impala/catalog/local/LocalCatalogTest.java  |  28 +
 20 files changed, 923 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
index 6abac52..0b110ae 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
@@ -23,9 +23,9 @@ import java.util.Set;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableAddReplaceColsParams;
 import org.apache.impala.thrift.TAlterTableParams;
@@ -75,7 +75,7 @@ public class AlterTableAddReplaceColsStmt extends AlterTableStmt {
     FeTable t = getTargetTable();
     // TODO: Support column-level DDL on HBase tables. Requires updating the column
     // mappings along with the table columns.
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE ADD|REPLACE COLUMNS not currently " +
           "supported on HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
index f7d8ce8..71577ce 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
@@ -20,9 +20,9 @@ package org.apache.impala.analysis;
 import java.util.Map;
 
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableAlterColParams;
@@ -101,7 +101,7 @@ public class AlterTableAlterColStmt extends AlterTableStmt {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
     FeTable t = getTargetTable();
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException(
           "ALTER TABLE CHANGE/ALTER COLUMN not currently supported on HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
index 94fbc8d..8b9e875 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropColStmt.java
@@ -18,8 +18,8 @@
 package org.apache.impala.analysis;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableDropColParams;
 import org.apache.impala.thrift.TAlterTableParams;
@@ -57,7 +57,7 @@ public class AlterTableDropColStmt extends AlterTableStmt {
     FeTable t = getTargetTable();
     // TODO: Support column-level DDL on HBase tables. Requires updating the column
     // mappings along with the table columns.
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE DROP COLUMN not currently supported " +
           "on HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
index 948a7aa..4d6a10a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetStmt.java
@@ -18,8 +18,8 @@
 package org.apache.impala.analysis;
 
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 
 /**
@@ -43,7 +43,7 @@ public class AlterTableSetStmt extends AlterTableStmt {
     // TODO: Support ALTER TABLE SET on HBase tables. Requires validating changes
     // to the SERDEPROPERTIES and TBLPROPERTIES to ensure the table metadata does not
     // become invalid.
-    if (t instanceof HBaseTable) {
+    if (t instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE SET not currently supported on " +
           "HBase tables.");
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 1f02390..8995772 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -27,9 +27,9 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
@@ -210,7 +210,7 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
 
     // ALTER TABLE SET is not supported on HBase tables at all, see
     // AlterTableSetStmt::analyze().
-    Preconditions.checkState(!(table instanceof HBaseTable));
+    Preconditions.checkState(!(table instanceof FeHBaseTable));
 
     if (table instanceof FeKuduTable) {
       throw new AnalysisException(String.format("'%s' table property is not supported " +

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
index 0f99c9b..8d45f55 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
@@ -20,9 +20,9 @@ package org.apache.impala.analysis;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
@@ -68,7 +68,7 @@ public class AlterTableSortByStmt extends AlterTableStmt {
 
     // Disallow setting sort columns on HBase and Kudu tables.
     FeTable targetTable = getTargetTable();
-    if (targetTable instanceof HBaseTable) {
+    if (targetTable instanceof FeHBaseTable) {
       throw new AnalysisException("ALTER TABLE SORT BY not supported on HBase tables.");
     }
     if (targetTable instanceof FeKuduTable) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index e902383..6468513 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -43,10 +43,10 @@ import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
@@ -589,7 +589,7 @@ public class Analyzer {
       // The table must be a base table.
       Preconditions.checkState(table instanceof FeFsTable ||
           table instanceof FeKuduTable ||
-          table instanceof HBaseTable ||
+          table instanceof FeHBaseTable ||
           table instanceof FeDataSourceTable);
       return new BaseTableRef(tableRef, resolvedPath);
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 9ce7b0d..52c7a15 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -30,8 +30,8 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsTable;
@@ -217,7 +217,8 @@ public class ComputeStatsStmt extends StatementBase {
     // For Hdfs tables, exclude partition columns from stats gathering because Hive
     // cannot store them as part of the non-partition column stats. For HBase tables,
     // include the single clustering column (the row key).
-    int startColIdx = (table_ instanceof HBaseTable) ? 0 : table_.getNumClusteringCols();
+    int startColIdx = (table_ instanceof FeHBaseTable) ? 0 :
+        table_.getNumClusteringCols();
 
     for (int i = startColIdx; i < table_.getColumns().size(); ++i) {
       Column c = table_.getColumns().get(i);

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 55da237..9998b0a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -27,10 +27,10 @@ import org.apache.impala.authorization.Privilege;
 import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
@@ -283,7 +283,7 @@ public class InsertStmt extends StatementBase {
     // Also checks if the target table is missing.
     analyzeTargetTable(analyzer);
 
-    boolean isHBaseTable = (table_ instanceof HBaseTable);
+    boolean isHBaseTable = (table_ instanceof FeHBaseTable);
     int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
 
     // Analysis of the INSERT statement from this point is basically the act of matching
@@ -459,7 +459,7 @@ public class InsertStmt extends StatementBase {
    * - Overwrite is invalid for HBase and Kudu tables
    */
   private void analyzeTableForInsert(Analyzer analyzer) throws AnalysisException {
-    boolean isHBaseTable = (table_ instanceof HBaseTable);
+    boolean isHBaseTable = (table_ instanceof FeHBaseTable);
     int numClusteringCols = isHBaseTable ? 0 : table_.getNumClusteringCols();
 
     if (partitionKeyValues_ != null && numClusteringCols == 0) {
@@ -539,7 +539,7 @@ public class InsertStmt extends StatementBase {
       // exists. So all columns aren't mentioned in the query.
       if (table_ instanceof FeKuduTable) {
         checkRequiredKuduColumns(mentionedColumnNames);
-      } else if (table_ instanceof HBaseTable) {
+      } else if (table_ instanceof FeHBaseTable) {
         checkRequiredHBaseColumns(mentionedColumnNames);
       } else if (table_.getNumClusteringCols() > 0) {
         checkRequiredPartitionedColumns(mentionedColumnNames);
@@ -605,7 +605,7 @@ public class InsertStmt extends StatementBase {
    */
   private void checkRequiredHBaseColumns(Set<String> mentionedColumnNames)
       throws AnalysisException {
-    Preconditions.checkState(table_ instanceof HBaseTable);
+    Preconditions.checkState(table_ instanceof FeHBaseTable);
     Column column = table_.getColumns().get(0);
     if (!mentionedColumnNames.contains(column.getName())) {
       throw new AnalysisException("Row-key column '" + column.getName() +
@@ -660,7 +660,8 @@ public class InsertStmt extends StatementBase {
     List<Expr> tmpPartitionKeyExprs = new ArrayList<Expr>();
     List<String> tmpPartitionKeyNames = new ArrayList<String>();
 
-    int numClusteringCols = (tbl instanceof HBaseTable) ? 0 : tbl.getNumClusteringCols();
+    int numClusteringCols = (tbl instanceof FeHBaseTable) ? 0
+        : tbl.getNumClusteringCols();
     boolean isKuduTable = table_ instanceof FeKuduTable;
     Set<String> kuduPartitionColumnNames = null;
     if (isKuduTable) {
@@ -809,7 +810,7 @@ public class InsertStmt extends StatementBase {
 
   private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
     if (planHints_.isEmpty()) return;
-    if (table_ instanceof HBaseTable) {
+    if (table_ instanceof FeHBaseTable) {
       throw new AnalysisException(String.format("INSERT hints are only supported for " +
           "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index dca6b94..6e5ad6a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -42,11 +42,11 @@ import org.apache.hadoop.hive.ql.parse.HiveLexer;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.Function;
-import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.KuduColumn;
@@ -240,7 +240,7 @@ public class ToSqlUtils {
     removeHiddenTableProperties(properties, Maps.<String, String>newHashMap());
     ArrayList<String> colsSql = Lists.newArrayList();
     ArrayList<String> partitionColsSql = Lists.newArrayList();
-    boolean isHbaseTable = table instanceof HBaseTable;
+    boolean isHbaseTable = table instanceof FeHBaseTable;
     for (int i = 0; i < table.getColumns().size(); i++) {
       if (!isHbaseTable && i < table.getNumClusteringCols()) {
         partitionColsSql.add(columnToSql(table.getColumns().get(i)));

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java
new file mode 100644
index 0000000..2b26dd5
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/FeHBaseTable.java
@@ -0,0 +1,676 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.THBaseTable;
+import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.util.StatsHelper;
+import org.apache.impala.util.TResultRowBuilder;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface FeHBaseTable extends FeTable {
+  /**
+   * @see Util#getEstimatedRowStats(byte[], byte[])
+   */
+  Pair<Long, Long> getEstimatedRowStats(byte[] startRowKey, byte[] endRowKey);
+
+  /**
+   * @see Util#getHBaseTableName(Table)
+   */
+  String getHBaseTableName();
+
+  /**
+   * @see Util#getTableStats(FeHBaseTable)
+   */
+  TResultSet getTableStats();
+
+  /**
+   * Implementations may want to cache column families. This getter is for static
+   * functions in {@link Util} to access those potentially cached data.
+   */
+  HColumnDescriptor[] getColumnFamilies() throws IOException;
+
+  /**
+   * Utility functions for acting on FeHBaseTable.
+   * When we fully move to Java 8, these can become default methods of the interface.
+   */
+  abstract class Util {
+    // Storage handler class for HBase tables read by Hive.
+    public static final String HBASE_STORAGE_HANDLER =
+        "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
+    // Column family of HBase row key
+    static final String ROW_KEY_COLUMN_FAMILY = ":key";
+    // Copied from Hive's HBaseStorageHandler.java.
+    static final String DEFAULT_PREFIX = "default.";
+    // Number of rows fetched during the row count estimation per region
+    static final int ROW_COUNT_ESTIMATE_BATCH_SIZE = 10;
+    // Keep the conf around
+    static final Configuration HBASE_CONF = HBaseConfiguration.create();
+    private static final Logger LOG = Logger.getLogger(FeHBaseTable.class);
+    // Maximum deviation from the average to stop querying more regions
+    // to estimate the row count
+    private static final double DELTA_FROM_AVERAGE = 0.15;
+    // Minimum number of regions that are checked to estimate the row count
+    private static final int MIN_NUM_REGIONS_TO_CHECK = 5;
+
+    /**
+     * Table client objects are thread-unsafe and cheap to create. The HBase docs
+     * recommend creating a new one for each task and then closing when done.
+     */
+    public static org.apache.hadoop.hbase.client.Table getHBaseTable(
+        String hbaseTableName) throws IOException {
+      return ConnectionHolder.getConnection().getTable(TableName.valueOf(hbaseTableName));
+    }
+
+    static org.apache.hadoop.hbase.client.Table getHBaseTable(FeHBaseTable tbl)
+        throws IOException {
+      return getHBaseTable(tbl.getHBaseTableName());
+    }
+
+    /**
+     * Load columns from msTable in hive order. No IO is involved.
+     */
+    public static List<Column> loadColumns(
+        org.apache.hadoop.hive.metastore.api.Table msTable)
+        throws MetaException, SerDeException {
+      Map<String, String> serdeParams = msTable.getSd().getSerdeInfo().getParameters();
+      String hbaseColumnsMapping = serdeParams.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+      if (hbaseColumnsMapping == null) {
+        throw new MetaException("No hbase.columns.mapping defined in Serde.");
+      }
+
+      String hbaseTableDefaultStorageType =
+          msTable.getParameters().get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
+      boolean tableDefaultStorageIsBinary = false;
+      if (hbaseTableDefaultStorageType != null &&
+          !hbaseTableDefaultStorageType.isEmpty()) {
+        if (hbaseTableDefaultStorageType.equalsIgnoreCase("binary")) {
+          tableDefaultStorageIsBinary = true;
+        } else if (!hbaseTableDefaultStorageType.equalsIgnoreCase("string")) {
+          throw new SerDeException(
+              "Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+                  " parameter must be specified as" + " 'string' or 'binary'; '" +
+                  hbaseTableDefaultStorageType +
+                  "' is not a valid specification for this table/serde property.");
+        }
+      }
+
+      // Parse HBase column-mapping string.
+      List<FieldSchema> fieldSchemas = msTable.getSd().getCols();
+      List<String> hbaseColumnFamilies = new ArrayList<>();
+      List<String> hbaseColumnQualifiers = new ArrayList<>();
+      List<Boolean> hbaseColumnBinaryEncodings = new ArrayList<>();
+      parseColumnMapping(tableDefaultStorageIsBinary, hbaseColumnsMapping,
+          msTable.getTableName(), fieldSchemas, hbaseColumnFamilies,
+          hbaseColumnQualifiers, hbaseColumnBinaryEncodings);
+      Preconditions
+          .checkState(hbaseColumnFamilies.size() == hbaseColumnQualifiers.size());
+      Preconditions.checkState(fieldSchemas.size() == hbaseColumnFamilies.size());
+
+      // Populate tmp cols in the order they appear in the Hive metastore.
+      // We will reorder the cols below.
+      List<HBaseColumn> tmpCols = Lists.newArrayList();
+      // Store the key column separately.
+      // TODO: Change this to an ArrayList once we support composite row keys.
+      HBaseColumn keyCol = null;
+      for (int i = 0; i < fieldSchemas.size(); ++i) {
+        FieldSchema s = fieldSchemas.get(i);
+        Type t = Type.INVALID;
+        try {
+          t = FeCatalogUtils.parseColumnType(s, msTable.getTableName());
+        } catch (TableLoadingException e) {
+          // Ignore hbase types we don't support yet. We can load the metadata
+          // but won't be able to select from it.
+        }
+        HBaseColumn col = new HBaseColumn(s.getName(), hbaseColumnFamilies.get(i),
+            hbaseColumnQualifiers.get(i), hbaseColumnBinaryEncodings.get(i), t,
+            s.getComment(), -1);
+        if (col.getColumnFamily().equals(ROW_KEY_COLUMN_FAMILY)) {
+          // Store the row key column separately from the rest
+          keyCol = col;
+        } else {
+          tmpCols.add(col);
+        }
+      }
+      Preconditions.checkState(keyCol != null);
+      // The backend assumes that the row key column is always first and
+      // that the remaining HBase columns are ordered by columnFamily,columnQualifier,
+      // so the final position depends on the other mapped HBase columns.
+      // Sort columns and update positions.
+      Collections.sort(tmpCols);
+      keyCol.setPosition(0);
+      List<Column> cols = new ArrayList<>();
+      cols.add(keyCol);
+      // Update the positions of the remaining columns
+      for (int i = 0; i < tmpCols.size(); ++i) {
+        HBaseColumn col = tmpCols.get(i);
+        col.setPosition(i + 1);
+        cols.add(col);
+      }
+      return cols;
+    }
+
+    /**
+     * Parse the column description string to the column families and column
+     * qualifies. This is a copy of HBaseSerDe.parseColumnMapping and
+     * parseColumnStorageTypes with parts we don't use removed. The hive functions
+     * are not public.
+     * tableDefaultStorageIsBinary - true if table is default to binary encoding
+     * columnsMappingSpec - input string format describing the table
+     * fieldSchemas - input field schema from metastore table
+     * columnFamilies/columnQualifiers/columnBinaryEncodings - out parameters that will be
+     * filled with the column family, column qualifier and encoding for each column.
+     */
+    static void parseColumnMapping(boolean tableDefaultStorageIsBinary,
+        String columnsMappingSpec, String tblName, List<FieldSchema> fieldSchemas,
+        List<String> columnFamilies, List<String> columnQualifiers,
+        List<Boolean> colIsBinaryEncoded) throws SerDeException {
+      if (columnsMappingSpec == null) {
+        throw new SerDeException(
+            "Error: hbase.columns.mapping missing for this HBase table.");
+      }
+
+      if (columnsMappingSpec.equals("") ||
+          columnsMappingSpec.equals(HBaseSerDe.HBASE_KEY_COL)) {
+        throw new SerDeException("Error: hbase.columns.mapping specifies only " +
+            "the HBase table row key. A valid Hive-HBase table must specify at " +
+            "least one additional column.");
+      }
+
+      int rowKeyIndex = -1;
+      String[] columnSpecs = columnsMappingSpec.split(",");
+      // If there was an implicit key column mapping, the number of columns (fieldSchemas)
+      // will be one more than the number of column mapping specs.
+      int fsStartIdxOffset = fieldSchemas.size() - columnSpecs.length;
+      if (fsStartIdxOffset != 0 && fsStartIdxOffset != 1) {
+        // This should never happen - Hive blocks creating a mismatched table and both
+        // Hive and Impala currently block all column-level DDL on HBase tables.
+        throw new SerDeException(String.format("Number of entries in " +
+                "'hbase.columns.mapping' does not match the number of columns in the " +
+                "table: %d != %d (counting the key if implicit)", columnSpecs.length,
+            fieldSchemas.size()));
+      }
+
+      for (int i = 0; i < columnSpecs.length; ++i) {
+        String mappingSpec = columnSpecs[i];
+        String[] mapInfo = mappingSpec.split("#");
+        // Trim column info so that serdeproperties with new lines still parse correctly.
+        String colInfo = mapInfo[0].trim();
+
+        int idxFirst = colInfo.indexOf(":");
+        int idxLast = colInfo.lastIndexOf(":");
+
+        if (idxFirst < 0 || !(idxFirst == idxLast)) {
+          throw new SerDeException("Error: the HBase columns mapping contains a " +
+              "badly formed column family, column qualifier specification.");
+        }
+
+        if (colInfo.equals(HBaseSerDe.HBASE_KEY_COL)) {
+          Preconditions.checkState(fsStartIdxOffset == 0);
+          rowKeyIndex = i;
+          columnFamilies.add(colInfo);
+          columnQualifiers.add(null);
+        } else {
+          String[] parts = colInfo.split(":");
+          Preconditions.checkState(parts.length > 0 && parts.length <= 2);
+          columnFamilies.add(parts[0]);
+          if (parts.length == 2) {
+            columnQualifiers.add(parts[1]);
+          } else {
+            columnQualifiers.add(null);
+          }
+        }
+
+        // Set column binary encoding
+        FieldSchema fieldSchema = fieldSchemas.get(i + fsStartIdxOffset);
+        boolean supportsBinaryEncoding = supportsBinaryEncoding(fieldSchema, tblName);
+        if (mapInfo.length == 1) {
+          // There is no column level storage specification. Use the table storage spec.
+          colIsBinaryEncoded.add(tableDefaultStorageIsBinary && supportsBinaryEncoding);
+        } else if (mapInfo.length == 2) {
+          // There is a storage specification for the column
+          String storageOption = mapInfo[1];
+
+          if (!(storageOption.equals("-") || "string".startsWith(storageOption) ||
+              "binary".startsWith(storageOption))) {
+            throw new SerDeException("Error: A column storage specification is one of" +
+                " the following: '-', a prefix of 'string', or a prefix of 'binary'. " +
+                storageOption + " is not a valid storage option specification for " +
+                fieldSchema.getName());
+          }
+
+          boolean isBinaryEncoded = false;
+          if ("-".equals(storageOption)) {
+            isBinaryEncoded = tableDefaultStorageIsBinary;
+          } else if ("binary".startsWith(storageOption)) {
+            isBinaryEncoded = true;
+          }
+          if (isBinaryEncoded && !supportsBinaryEncoding) {
+            // Use string encoding and log a warning if the column spec is binary but the
+            // column type does not support it.
+            // TODO: Hive/HBase does not raise an exception, but should we?
+            LOG.warn("Column storage specification for column " + fieldSchema.getName() +
+                " is binary" + " but the column type " + fieldSchema.getType() +
+                " does not support binary encoding. Fallback to string format.");
+            isBinaryEncoded = false;
+          }
+          colIsBinaryEncoded.add(isBinaryEncoded);
+        } else {
+          // error in storage specification
+          throw new SerDeException(
+              "Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification " +
+                  mappingSpec + " is not valid for column: " + fieldSchema.getName());
+        }
+      }
+
+      if (rowKeyIndex == -1) {
+        columnFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
+        columnQualifiers.add(0, null);
+        colIsBinaryEncoded.add(0, supportsBinaryEncoding(fieldSchemas.get(0), tblName) &&
+            tableDefaultStorageIsBinary);
+      }
+    }
+
+
+    /**
+     * Get an estimate of the number of rows and bytes per row in regions between
+     * startRowKey and endRowKey.
+     * <p>
+     * This number is calculated by incrementally checking as many region servers as
+     * necessary until we observe a relatively constant row size per region on average.
+     * Depending on the skew of data in the regions this can either mean that we need
+     * to check only a minimal number of regions or that we will scan all regions.
+     * <p>
+     * The HBase region servers periodically update the master with their metrics,
+     * including storefile size. We get the size of the storefiles for all regions in
+     * the cluster with a single call to getClusterStatus from the master.
+     * <p>
+     * The accuracy of this number is determined by the number of rows that are written
+     * and kept in the memstore and have not been flushed until now. A large number
+     * of key-value pairs in the memstore will lead to bad estimates as this number
+     * is not reflected in the storefile size that is used to estimate this number.
+     * <p>
+     * Currently, the algorithm does not consider the case that the key range used as a
+     * parameter might be generally of different size than the rest of the region.
+     * <p>
+     * The values computed here should be cached so that in high qps workloads
+     * the nn is not overwhelmed. Could be done in load(); Synchronized to make
+     * sure that only one thread at a time is using the htable.
+     *
+     * @param startRowKey First row key in the range
+     * @param endRowKey   Last row key in the range
+     * @return The estimated number of rows in the regions between the row keys (first)
+     * and the estimated row size in bytes (second).
+     */
+    public static Pair<Long, Long> getEstimatedRowStats(FeHBaseTable tbl,
+        byte[] startRowKey, byte[] endRowKey) {
+      Preconditions.checkNotNull(startRowKey);
+      Preconditions.checkNotNull(endRowKey);
+
+      boolean isCompressed = false;
+      long rowCount;
+      long rowSize;
+      try {
+        ClusterStatus clusterStatus = getClusterStatus();
+        // Check to see if things are compressed.
+        // If they are we'll estimate a compression factor.
+        HColumnDescriptor[] columnFamilies = tbl.getColumnFamilies();
+        Preconditions.checkNotNull(columnFamilies);
+        for (HColumnDescriptor desc : columnFamilies) {
+          isCompressed |= desc.getCompression() != Compression.Algorithm.NONE;
+        }
+
+        // Fetch all regions for the key range
+        List<HRegionLocation> locations = getRegionsInRange(tbl, startRowKey, endRowKey);
+        Collections.shuffle(locations);
+        // The following variables track the number and size of 'rows' in
+        // HBase and allow incremental calculation of the average and standard
+        // deviation.
+        StatsHelper<Long> statsSize = new StatsHelper<>();
+        long totalEstimatedRows = 0;
+
+        // Collects stats samples from at least MIN_NUM_REGIONS_TO_CHECK
+        // and at most all regions until the delta is small enough.
+        while ((statsSize.count() < MIN_NUM_REGIONS_TO_CHECK ||
+            statsSize.stddev() > statsSize.mean() * DELTA_FROM_AVERAGE) &&
+            statsSize.count() < locations.size()) {
+          HRegionLocation currentLocation = locations.get((int) statsSize.count());
+          Pair<Long, Long> tmp =
+              getEstimatedRowStatsForRegion(tbl, currentLocation, isCompressed,
+                  clusterStatus);
+          totalEstimatedRows += tmp.first;
+          statsSize.addSample(tmp.second);
+        }
+
+        // Sum up the total size for all regions in range.
+        long totalSize = 0;
+        for (final HRegionLocation location : locations) {
+          totalSize += getRegionSize(location, clusterStatus);
+        }
+        if (totalSize == 0) {
+          rowCount = totalEstimatedRows;
+        } else {
+          rowCount = (long) (totalSize / statsSize.mean());
+        }
+        rowSize = (long) statsSize.mean();
+        return new Pair<>(rowCount, rowSize);
+      } catch (IOException ioe) {
+        // Print the stack trace, but we'll ignore it
+        // as this is just an estimate.
+        // TODO: Put this into the per query log.
+        LOG.error("Error computing HBase row count estimate", ioe);
+        return new Pair<>(-1L, -1L);
+      }
+    }
+
+    /**
+     * Returns statistics on this table as a tabular result set. Used for the
+     * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
+     * inside this method.
+     */
+    public static TResultSet getTableStats(FeHBaseTable tbl) {
+      TResultSet result = new TResultSet();
+      TResultSetMetadata resultSchema = new TResultSetMetadata();
+      result.setSchema(resultSchema);
+      resultSchema.addToColumns(new TColumn("Region Location", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Start RowKey", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Est. #Rows", Type.BIGINT.toThrift()));
+      resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
+
+      // TODO: Consider fancier stats maintenance techniques for speeding up this process.
+      // Currently, we list all regions and perform a mini-scan of each of them to
+      // estimate the number of rows, the data size, etc., which is rather expensive.
+      try {
+        ClusterStatus clusterStatus = getClusterStatus();
+        long totalNumRows = 0;
+        long totalSize = 0;
+        List<HRegionLocation> regions =
+            getRegionsInRange(tbl, HConstants.EMPTY_END_ROW, HConstants.EMPTY_START_ROW);
+        for (HRegionLocation region : regions) {
+          TResultRowBuilder rowBuilder = new TResultRowBuilder();
+          HRegionInfo regionInfo = region.getRegionInfo();
+          Pair<Long, Long> estRowStats =
+              getEstimatedRowStatsForRegion(tbl, region, false, clusterStatus);
+
+          long numRows = estRowStats.first;
+          long regionSize = getRegionSize(region, clusterStatus);
+          totalNumRows += numRows;
+          totalSize += regionSize;
+
+          // Add the region location, start rowkey, number of rows and raw size.
+          rowBuilder.add(String.valueOf(region.getHostname()))
+              .add(Bytes.toString(regionInfo.getStartKey())).add(numRows)
+              .addBytes(regionSize);
+          result.addToRows(rowBuilder.get());
+        }
+
+        // Total num rows and raw region size.
+        if (regions.size() > 1) {
+          TResultRowBuilder rowBuilder = new TResultRowBuilder();
+          rowBuilder.add("Total").add("").add(totalNumRows).addBytes(totalSize);
+          result.addToRows(rowBuilder.get());
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return result;
+    }
+
+
+    /**
+     * This method is completely copied from Hive's HBaseStorageHandler.java.
+     */
+    public static String getHBaseTableName(
+        org.apache.hadoop.hive.metastore.api.Table tbl) {
+      // Give preference to TBLPROPERTIES over SERDEPROPERTIES
+      // (really we should only use TBLPROPERTIES, so this is just
+      // for backwards compatibility with the original specs).
+      String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
+      if (tableName == null) {
+        tableName =
+            tbl.getSd().getSerdeInfo().getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
+      }
+      if (tableName == null) {
+        tableName = tbl.getDbName() + "." + tbl.getTableName();
+        if (tableName.startsWith(DEFAULT_PREFIX)) {
+          tableName = tableName.substring(DEFAULT_PREFIX.length());
+        }
+      }
+      return tableName;
+    }
+
+    /**
+     * Estimates the number of rows for a single region and returns a pair with
+     * the estimated row count and the estimated size in bytes per row.
+     */
+    private static Pair<Long, Long> getEstimatedRowStatsForRegion(FeHBaseTable tbl,
+        HRegionLocation location, boolean isCompressed, ClusterStatus clusterStatus)
+        throws IOException {
+      HRegionInfo info = location.getRegionInfo();
+
+      Scan s = new Scan(info.getStartKey());
+      // Get a small sample of rows
+      s.setBatch(ROW_COUNT_ESTIMATE_BATCH_SIZE);
+      // Try and get every version so the row's size can be used to estimate.
+      s.setMaxVersions(Short.MAX_VALUE);
+      // Don't cache the blocks as we don't think these are
+      // necessarily important blocks.
+      s.setCacheBlocks(false);
+      // Try and get deletes too so their size can be counted.
+      s.setRaw(false);
+
+      long currentRowSize = 0;
+      long currentRowCount = 0;
+
+      try (org.apache.hadoop.hbase.client.Table table = getHBaseTable(tbl);
+          ResultScanner rs = table.getScanner(s)) {
+        // Get the the ROW_COUNT_ESTIMATE_BATCH_SIZE fetched rows
+        // for a representative sample
+        for (int i = 0; i < ROW_COUNT_ESTIMATE_BATCH_SIZE; ++i) {
+          Result r = rs.next();
+          if (r == null) break;
+          // Check for empty rows, see IMPALA-1451
+          if (r.isEmpty()) continue;
+          ++currentRowCount;
+          // To estimate the number of rows we simply use the amount of bytes
+          // returned from the underlying buffer. Since HBase internally works
+          // with these structures as well this gives us ok estimates.
+          Cell[] cells = r.rawCells();
+          for (Cell c : cells) {
+            if (c instanceof KeyValue) {
+              currentRowSize += KeyValue
+                  .getKeyValueDataStructureSize(c.getRowLength(), c.getFamilyLength(),
+                      c.getQualifierLength(), c.getValueLength(), c.getTagsLength());
+            } else {
+              throw new IllegalStateException(
+                  "Celltype " + c.getClass().getName() + " not supported.");
+            }
+          }
+        }
+      }
+
+      // If there are no rows then no need to estimate.
+      if (currentRowCount == 0) return new Pair<>(0L, 0L);
+      // Get the size.
+      long currentSize = getRegionSize(location, clusterStatus);
+      // estimate the number of rows.
+      double bytesPerRow = currentRowSize / (double) currentRowCount;
+      if (currentSize == 0) {
+        return new Pair<>(currentRowCount, (long) bytesPerRow);
+      }
+
+      // Compression factor two is only a best effort guess
+      long estimatedRowCount =
+          (long) ((isCompressed ? 2 : 1) * (currentSize / bytesPerRow));
+
+      return new Pair<>(estimatedRowCount, (long) bytesPerRow);
+    }
+
+
+    /**
+     * Returns the size of the given region in bytes. Simply returns the storefile size
+     * for this region from the ClusterStatus. Returns 0 in case of an error.
+     */
+    private static long getRegionSize(HRegionLocation location,
+        ClusterStatus clusterStatus) {
+      HRegionInfo info = location.getRegionInfo();
+      ServerLoad serverLoad = clusterStatus.getLoad(location.getServerName());
+
+      // If the serverLoad is null, the master doesn't have information for this region's
+      // server. This shouldn't normally happen.
+      if (serverLoad == null) {
+        LOG.error("Unable to find server load for server: " + location.getServerName() +
+            " for location " + info.getRegionNameAsString());
+        return 0;
+      }
+      RegionLoad regionLoad = serverLoad.getRegionsLoad().get(info.getRegionName());
+      if (regionLoad == null) {
+        LOG.error("Unable to find regions load for server: " + location.getServerName() +
+            " for location " + info.getRegionNameAsString());
+        return 0;
+      }
+      final long megaByte = 1024L * 1024L;
+      return regionLoad.getStorefileSizeMB() * megaByte;
+    }
+
+    public static THBaseTable getTHBaseTable(FeHBaseTable table) {
+      THBaseTable tHbaseTable = new THBaseTable();
+      tHbaseTable.setTableName(table.getHBaseTableName());
+      for (Column c : table.getColumns()) {
+        HBaseColumn hbaseCol = (HBaseColumn) c;
+        tHbaseTable.addToFamilies(hbaseCol.getColumnFamily());
+        if (hbaseCol.getColumnQualifier() != null) {
+          tHbaseTable.addToQualifiers(hbaseCol.getColumnQualifier());
+        } else {
+          tHbaseTable.addToQualifiers("");
+        }
+        tHbaseTable.addToBinary_encoded(hbaseCol.isBinaryEncoded());
+      }
+      return tHbaseTable;
+    }
+
+    /**
+     * Get the corresponding regions for an arbitrary range of keys.
+     * This is copied from org.apache.hadoop.hbase.client.HTable in HBase 0.95. The
+     * difference is that it does not use cache when calling getRegionLocation.
+     *
+     * @param tbl      An FeHBaseTable in the catalog
+     * @param startKey Starting key in range, inclusive
+     * @param endKey   Ending key in range, exclusive
+     * @return A list of HRegionLocations corresponding to the regions that
+     * contain the specified range
+     * @throws IOException if a remote or network exception occurs
+     */
+    public static List<HRegionLocation> getRegionsInRange(FeHBaseTable tbl,
+        final byte[] startKey, final byte[] endKey) throws IOException {
+      try (org.apache.hadoop.hbase.client.Table hbaseTbl = getHBaseTable(tbl)) {
+        final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
+        if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
+          throw new IllegalArgumentException(
+              "Invalid range: " + Bytes.toStringBinary(startKey) + " > " +
+                  Bytes.toStringBinary(endKey));
+        }
+        final List<HRegionLocation> regionList = new ArrayList<>();
+        byte[] currentKey = startKey;
+        Connection connection = ConnectionHolder.getConnection();
+        RegionLocator locator = connection.getRegionLocator(hbaseTbl.getName());
+        do {
+          // always reload region location info.
+          HRegionLocation regionLocation = locator.getRegionLocation(currentKey, true);
+          regionList.add(regionLocation);
+          currentKey = regionLocation.getRegionInfo().getEndKey();
+        } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
+            (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
+        return regionList;
+      }
+    }
+
+    /**
+     * Get the cluster status, making sure we close the admin client afterwards.
+     */
+    static ClusterStatus getClusterStatus() throws IOException {
+      try (Admin admin = ConnectionHolder.getConnection().getAdmin()) {
+        return admin.getClusterStatus();
+      }
+    }
+
+    private static boolean supportsBinaryEncoding(FieldSchema fs, String tblName) {
+      try {
+        Type colType = FeCatalogUtils.parseColumnType(fs, tblName);
+        // Only boolean, integer and floating point types can use binary storage.
+        return colType.isBoolean() || colType.isIntegerType() ||
+            colType.isFloatingPointType();
+      } catch (TableLoadingException e) {
+        return false;
+      }
+    }
+
+    /**
+     * Connection instances are expensive to create. The HBase documentation recommends
+     * one and then sharing it among threads. All operations on a connection are
+     * thread-safe.
+     */
+    static class ConnectionHolder {
+      private static Connection connection_ = null;
+
+      static synchronized Connection getConnection() throws IOException {
+        if (connection_ == null || connection_.isClosed()) {
+          connection_ = ConnectionFactory.createConnection(Util.HBASE_CONF);
+        }
+        return connection_;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bf4dc2/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index ceb6e70..5aad812 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -17,57 +17,22 @@
 
 package org.apache.impala.catalog;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hive.hbase.HBaseSerDe;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.log4j.Logger;
-
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TCatalogObjectType;
-import org.apache.impala.thrift.TColumn;
-import org.apache.impala.thrift.THBaseTable;
 import org.apache.impala.thrift.TResultSet;
-import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
-import org.apache.impala.util.StatsHelper;
-import org.apache.impala.util.TResultRowBuilder;
 
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Impala representation of HBase table metadata,
@@ -80,28 +45,8 @@ import com.google.common.collect.Lists;
  * This implies that a "select *"-query on an HBase table
  * will not have the columns ordered as they were declared in the DDL.
  * They will be ordered by family/qualifier.
- *
  */
-public class HBaseTable extends Table {
-  // Maximum deviation from the average to stop querying more regions
-  // to estimate the row count
-  private static final double DELTA_FROM_AVERAGE = 0.15;
-
-  private static final Logger LOG = Logger.getLogger(HBaseTable.class);
-
-  // Copied from Hive's HBaseStorageHandler.java.
-  public static final String DEFAULT_PREFIX = "default.";
-
-  // Number of rows fetched during the row count estimation per region
-  public static final int ROW_COUNT_ESTIMATE_BATCH_SIZE = 10;
-
-  // Minimum number of regions that are checked to estimate the row count
-  private static final int MIN_NUM_REGIONS_TO_CHECK = 5;
-
-  // Name of table in HBase.
-  // 'this.name' is the alias of the HBase table in Hive.
-  protected String hbaseTableName_;
-
+public class HBaseTable extends Table implements FeHBaseTable {
   // Input format class for HBase tables read by Hive.
   private static final String HBASE_INPUT_FORMAT =
       "org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat";
@@ -110,316 +55,69 @@ public class HBaseTable extends Table {
   private static final String HBASE_SERIALIZATION_LIB =
       "org.apache.hadoop.hive.hbase.HBaseSerDe";
 
-  // Storage handler class for HBase tables read by Hive.
-  private static final String HBASE_STORAGE_HANDLER =
-      "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
 
-  // Column family of HBase row key
-  private static final String ROW_KEY_COLUMN_FAMILY = ":key";
+  // Name of table in HBase.
+  // 'this.name' is the alias of the HBase table in Hive.
+  private String hbaseTableName_;
 
-  // Keep the conf around
-  private final static Configuration hbaseConf_ = HBaseConfiguration.create();
 
   // Cached column families. Used primarily for speeding up row stats estimation
   // (see IMPALA-4211).
   private HColumnDescriptor[] columnFamilies_ = null;
 
-  protected HBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
-      Db db, String name, String owner) {
+  protected HBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl, Db db,
+      String name, String owner) {
     super(msTbl, db, name, owner);
   }
 
   /**
-   * Connection instances are expensive to create. The HBase documentation recommends
-   * one and then sharing it among threads. All operations on a connection are
-   * thread-safe.
-   */
-  private static class ConnectionHolder {
-    private static Connection connection_ = null;
-
-    public static synchronized Connection getConnection(Configuration conf)
-        throws IOException {
-      if (connection_ == null || connection_.isClosed()) {
-        connection_ = ConnectionFactory.createConnection(conf);
-      }
-      return connection_;
-    }
-  }
-
-  /**
-   * Table client objects are thread-unsafe and cheap to create. The HBase docs recommend
-   * creating a new one for each task and then closing when done.
-   */
-  public org.apache.hadoop.hbase.client.Table getHBaseTable() throws IOException {
-    return ConnectionHolder.getConnection(hbaseConf_)
-        .getTable(TableName.valueOf(hbaseTableName_));
-  }
-
-  private void closeHBaseTable(org.apache.hadoop.hbase.client.Table table) {
-    try {
-      table.close();
-    } catch (IOException e) {
-      LOG.error("Error closing HBase table: " + hbaseTableName_, e);
-    }
-  }
-
-  /**
-   * Get the cluster status, making sure we close the admin client afterwards.
-   */
-  public ClusterStatus getClusterStatus() throws IOException {
-    Admin admin = null;
-    ClusterStatus clusterStatus = null;
-    try {
-      Connection connection = ConnectionHolder.getConnection(hbaseConf_);
-      admin = connection.getAdmin();
-      clusterStatus = admin.getClusterStatus();
-    } finally {
-      if (admin != null) admin.close();
-    }
-    return clusterStatus;
-  }
-
-  /**
-   * Parse the column description string to the column families and column
-   * qualifies. This is a copy of HBaseSerDe.parseColumnMapping and
-   * parseColumnStorageTypes with parts we don't use removed. The hive functions
-   * are not public.
-
-   * tableDefaultStorageIsBinary - true if table is default to binary encoding
-   * columnsMappingSpec - input string format describing the table
-   * fieldSchemas - input field schema from metastore table
-   * columnFamilies/columnQualifiers/columnBinaryEncodings - out parameters that will be
-   * filled with the column family, column qualifier and encoding for each column.
+   * Returns true if the given Metastore Table represents an HBase table.
+   * Versions of Hive/HBase are inconsistent which HBase related fields are set
+   * (e.g., HIVE-6548 changed the input format to null).
+   * For maximum compatibility consider all known fields that indicate an HBase table.
    */
-  private void parseColumnMapping(boolean tableDefaultStorageIsBinary,
-      String columnsMappingSpec, List<FieldSchema> fieldSchemas,
-      List<String> columnFamilies, List<String> columnQualifiers,
-      List<Boolean> colIsBinaryEncoded) throws SerDeException {
-    if (columnsMappingSpec == null) {
-      throw new SerDeException(
-          "Error: hbase.columns.mapping missing for this HBase table.");
-    }
-
-    if (columnsMappingSpec.equals("") ||
-        columnsMappingSpec.equals(HBaseSerDe.HBASE_KEY_COL)) {
-      throw new SerDeException("Error: hbase.columns.mapping specifies only "
-          + "the HBase table row key. A valid Hive-HBase table must specify at "
-          + "least one additional column.");
-    }
-
-    int rowKeyIndex = -1;
-    String[] columnSpecs = columnsMappingSpec.split(",");
-    // If there was an implicit key column mapping, the number of columns (fieldSchemas)
-    // will be one more than the number of column mapping specs.
-    int fsStartIdxOffset = fieldSchemas.size() - columnSpecs.length;
-    if (fsStartIdxOffset != 0 && fsStartIdxOffset != 1) {
-      // This should never happen - Hive blocks creating a mismatched table and both Hive
-      // and Impala currently block all column-level DDL on HBase tables.
-      throw new SerDeException(String.format("Number of entries in " +
-          "'hbase.columns.mapping' does not match the number of columns in the " +
-          "table: %d != %d (counting the key if implicit)",
-          columnSpecs.length, fieldSchemas.size()));
-    }
-
-    for (int i = 0; i < columnSpecs.length; ++i) {
-      String mappingSpec = columnSpecs[i];
-      String[] mapInfo = mappingSpec.split("#");
-      // Trim column info so that serdeproperties with new lines still parse correctly.
-      String colInfo = mapInfo[0].trim();
-
-      int idxFirst = colInfo.indexOf(":");
-      int idxLast = colInfo.lastIndexOf(":");
-
-      if (idxFirst < 0 || !(idxFirst == idxLast)) {
-        throw new SerDeException("Error: the HBase columns mapping contains a "
-            + "badly formed column family, column qualifier specification.");
-      }
-
-      if (colInfo.equals(HBaseSerDe.HBASE_KEY_COL)) {
-        Preconditions.checkState(fsStartIdxOffset == 0);
-        rowKeyIndex = i;
-        columnFamilies.add(colInfo);
-        columnQualifiers.add(null);
-      } else {
-        String[] parts = colInfo.split(":");
-        Preconditions.checkState(parts.length > 0 && parts.length <= 2);
-        columnFamilies.add(parts[0]);
-        if (parts.length == 2) {
-          columnQualifiers.add(parts[1]);
-        } else {
-          columnQualifiers.add(null);
-        }
-      }
-
-      // Set column binary encoding
-      FieldSchema fieldSchema = fieldSchemas.get(i + fsStartIdxOffset);
-      boolean supportsBinaryEncoding = supportsBinaryEncoding(fieldSchema);
-      if (mapInfo.length == 1) {
-        // There is no column level storage specification. Use the table storage spec.
-        colIsBinaryEncoded.add(
-            new Boolean(tableDefaultStorageIsBinary && supportsBinaryEncoding));
-      } else if (mapInfo.length == 2) {
-        // There is a storage specification for the column
-        String storageOption = mapInfo[1];
-
-        if (!(storageOption.equals("-") || "string".startsWith(storageOption) || "binary"
-            .startsWith(storageOption))) {
-          throw new SerDeException("Error: A column storage specification is one of"
-              + " the following: '-', a prefix of 'string', or a prefix of 'binary'. "
-              + storageOption + " is not a valid storage option specification for "
-              + fieldSchema.getName());
-        }
-
-        boolean isBinaryEncoded = false;
-        if ("-".equals(storageOption)) {
-          isBinaryEncoded = tableDefaultStorageIsBinary;
-        } else if ("binary".startsWith(storageOption)) {
-          isBinaryEncoded = true;
-        }
-        if (isBinaryEncoded && !supportsBinaryEncoding) {
-          // Use string encoding and log a warning if the column spec is binary but the
-          // column type does not support it.
-          // TODO: Hive/HBase does not raise an exception, but should we?
-          LOG.warn("Column storage specification for column " + fieldSchema.getName()
-              + " is binary" + " but the column type " + fieldSchema.getType() +
-              " does not support binary encoding. Fallback to string format.");
-          isBinaryEncoded = false;
-        }
-        colIsBinaryEncoded.add(isBinaryEncoded);
-      } else {
-        // error in storage specification
-        throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING
-            + " storage specification " + mappingSpec + " is not valid for column: "
-            + fieldSchema.getName());
-      }
-    }
-
-    if (rowKeyIndex == -1) {
-      columnFamilies.add(0, HBaseSerDe.HBASE_KEY_COL);
-      columnQualifiers.add(0, null);
-      colIsBinaryEncoded.add(0,
-          supportsBinaryEncoding(fieldSchemas.get(0)) && tableDefaultStorageIsBinary);
-    }
-  }
-
-  private boolean supportsBinaryEncoding(FieldSchema fs) {
-    try {
-      Type colType = parseColumnType(fs);
-      // Only boolean, integer and floating point types can use binary storage.
-      return colType.isBoolean() || colType.isIntegerType()
-          || colType.isFloatingPointType();
-    } catch (TableLoadingException e) {
-      return false;
+  public static boolean isHBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    if (msTbl.getParameters() != null &&
+        msTbl.getParameters().containsKey(Util.HBASE_STORAGE_HANDLER)) {
+      return true;
     }
+    StorageDescriptor sd = msTbl.getSd();
+    if (sd == null) return false;
+    if (sd.getInputFormat() != null && sd.getInputFormat().equals(HBASE_INPUT_FORMAT)) {
+      return true;
+    } else return sd.getSerdeInfo() != null &&
+        sd.getSerdeInfo().getSerializationLib() != null &&
+        sd.getSerdeInfo().getSerializationLib().equals(HBASE_SERIALIZATION_LIB);
   }
 
-  @Override
   /**
    * For hbase tables, we can support tables with columns we don't understand at
    * all (e.g. map) as long as the user does not select those. This is in contrast
    * to hdfs tables since we typically need to understand all columns to make sense
    * of the file at all.
    */
+  @Override
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     Preconditions.checkNotNull(getMetaStoreTable());
-    final Timer.Context context =
-        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
-    try {
+    try (Timer.Context timer = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time()) {
       msTable_ = msTbl;
-      hbaseTableName_ = getHBaseTableName(getMetaStoreTable());
+      hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable());
       // Warm up the connection and verify the table exists.
-      getHBaseTable().close();
+      Util.getHBaseTable(hbaseTableName_).close();
       columnFamilies_ = null;
-      Map<String, String> serdeParams =
-          getMetaStoreTable().getSd().getSerdeInfo().getParameters();
-      String hbaseColumnsMapping = serdeParams.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-      if (hbaseColumnsMapping == null) {
-        throw new MetaException("No hbase.columns.mapping defined in Serde.");
-      }
-
-      String hbaseTableDefaultStorageType = getMetaStoreTable().getParameters().get(
-          HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
-      boolean tableDefaultStorageIsBinary = false;
-      if (hbaseTableDefaultStorageType != null &&
-          !hbaseTableDefaultStorageType.isEmpty()) {
-        if (hbaseTableDefaultStorageType.equalsIgnoreCase("binary")) {
-          tableDefaultStorageIsBinary = true;
-        } else if (!hbaseTableDefaultStorageType.equalsIgnoreCase("string")) {
-          throw new SerDeException("Error: " +
-              HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
-              " parameter must be specified as" +
-              " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
-              "' is not a valid specification for this table/serde property.");
-        }
-      }
-
-      // Parse HBase column-mapping string.
-      List<FieldSchema> fieldSchemas = getMetaStoreTable().getSd().getCols();
-      List<String> hbaseColumnFamilies = new ArrayList<String>();
-      List<String> hbaseColumnQualifiers = new ArrayList<String>();
-      List<Boolean> hbaseColumnBinaryEncodings = new ArrayList<Boolean>();
-      parseColumnMapping(tableDefaultStorageIsBinary, hbaseColumnsMapping, fieldSchemas,
-          hbaseColumnFamilies, hbaseColumnQualifiers, hbaseColumnBinaryEncodings);
-      Preconditions.checkState(
-          hbaseColumnFamilies.size() == hbaseColumnQualifiers.size());
-      Preconditions.checkState(fieldSchemas.size() == hbaseColumnFamilies.size());
-
-      // Populate tmp cols in the order they appear in the Hive metastore.
-      // We will reorder the cols below.
-      List<HBaseColumn> tmpCols = Lists.newArrayList();
-      // Store the key column separately.
-      // TODO: Change this to an ArrayList once we support composite row keys.
-      HBaseColumn keyCol = null;
-      for (int i = 0; i < fieldSchemas.size(); ++i) {
-        FieldSchema s = fieldSchemas.get(i);
-        Type t = Type.INVALID;
-        try {
-          t = parseColumnType(s);
-        } catch (TableLoadingException e) {
-          // Ignore hbase types we don't support yet. We can load the metadata
-          // but won't be able to select from it.
-        }
-        HBaseColumn col = new HBaseColumn(s.getName(), hbaseColumnFamilies.get(i),
-            hbaseColumnQualifiers.get(i), hbaseColumnBinaryEncodings.get(i),
-            t, s.getComment(), -1);
-        if (col.getColumnFamily().equals(ROW_KEY_COLUMN_FAMILY)) {
-          // Store the row key column separately from the rest
-          keyCol = col;
-        } else {
-          tmpCols.add(col);
-        }
-      }
-      Preconditions.checkState(keyCol != null);
-
-      // The backend assumes that the row key column is always first and
-      // that the remaining HBase columns are ordered by columnFamily,columnQualifier,
-      // so the final position depends on the other mapped HBase columns.
-      // Sort columns and update positions.
-      Collections.sort(tmpCols);
+      List<Column> cols = Util.loadColumns(msTable_);
       clearColumns();
-
-      keyCol.setPosition(0);
-      addColumn(keyCol);
-      // Update the positions of the remaining columns
-      for (int i = 0; i < tmpCols.size(); ++i) {
-        HBaseColumn col = tmpCols.get(i);
-        col.setPosition(i + 1);
-        addColumn(col);
-      }
-
+      for (Column col : cols) addColumn(col);
       // Set table stats.
       setTableStats(msTable_);
-
       // since we don't support composite hbase rowkeys yet, all hbase tables have a
       // single clustering col
       numClusteringCols_ = 1;
       loadAllColumnStats(client);
     } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for HBase table: " +
-          name_, e);
-    } finally {
-      context.stop();
+      throw new TableLoadingException("Failed to load metadata for HBase table: " + name_,
+          e);
     }
   }
 
@@ -427,233 +125,19 @@ public class HBaseTable extends Table {
   protected void loadFromThrift(TTable table) throws TableLoadingException {
     super.loadFromThrift(table);
     try {
-      hbaseTableName_ = getHBaseTableName(getMetaStoreTable());
+      hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable());
       // Warm up the connection and verify the table exists.
-      getHBaseTable().close();
+      Util.getHBaseTable(hbaseTableName_).close();
       columnFamilies_ = null;
     } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for HBase table from " +
-          "thrift table: " + name_, e);
-    }
-  }
-
-  /**
-   * This method is completely copied from Hive's HBaseStorageHandler.java.
-   */
-  private String getHBaseTableName(org.apache.hadoop.hive.metastore.api.Table tbl) {
-    // Give preference to TBLPROPERTIES over SERDEPROPERTIES
-    // (really we should only use TBLPROPERTIES, so this is just
-    // for backwards compatibility with the original specs).
-    String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
-    if (tableName == null) {
-      tableName = tbl.getSd().getSerdeInfo().getParameters().get(
-          HBaseSerDe.HBASE_TABLE_NAME);
-    }
-    if (tableName == null) {
-      tableName = tbl.getDbName() + "." + tbl.getTableName();
-      if (tableName.startsWith(DEFAULT_PREFIX)) {
-        tableName = tableName.substring(DEFAULT_PREFIX.length());
-      }
+      throw new TableLoadingException(
+          "Failed to load metadata for HBase table from thrift table: " + name_, e);
     }
-    return tableName;
   }
 
-  /**
-   * Estimates the number of rows for a single region and returns a pair with
-   * the estimated row count and the estimated size in bytes per row.
-   */
-  private Pair<Long, Long> getEstimatedRowStatsForRegion(HRegionLocation location,
-      boolean isCompressed, ClusterStatus clusterStatus) throws IOException {
-    HRegionInfo info = location.getRegionInfo();
-
-    Scan s = new Scan(info.getStartKey());
-    // Get a small sample of rows
-    s.setBatch(ROW_COUNT_ESTIMATE_BATCH_SIZE);
-    // Try and get every version so the row's size can be used to estimate.
-    s.setMaxVersions(Short.MAX_VALUE);
-    // Don't cache the blocks as we don't think these are
-    // necessarily important blocks.
-    s.setCacheBlocks(false);
-    // Try and get deletes too so their size can be counted.
-    s.setRaw(false);
-
-    org.apache.hadoop.hbase.client.Table table = getHBaseTable();
-    ResultScanner rs = table.getScanner(s);
-
-    long currentRowSize = 0;
-    long currentRowCount = 0;
-
-    try {
-      // Get the the ROW_COUNT_ESTIMATE_BATCH_SIZE fetched rows
-      // for a representative sample
-      for (int i = 0; i < ROW_COUNT_ESTIMATE_BATCH_SIZE; ++i) {
-        Result r = rs.next();
-        if (r == null)
-          break;
-        // Check for empty rows, see IMPALA-1451
-        if (r.isEmpty())
-          continue;
-        ++currentRowCount;
-        // To estimate the number of rows we simply use the amount of bytes
-        // returned from the underlying buffer. Since HBase internally works
-        // with these structures as well this gives us ok estimates.
-        Cell[] cells = r.rawCells();
-        for (Cell c : cells) {
-          if (c instanceof KeyValue) {
-            currentRowSize += KeyValue.getKeyValueDataStructureSize(c.getRowLength(),
-                c.getFamilyLength(), c.getQualifierLength(), c.getValueLength(),
-                c.getTagsLength());
-          } else {
-            throw new IllegalStateException("Celltype " + c.getClass().getName() +
-                " not supported.");
-          }
-        }
-      }
-    } finally {
-      rs.close();
-      closeHBaseTable(table);
-    }
-
-    // If there are no rows then no need to estimate.
-    if (currentRowCount == 0) return new Pair<Long, Long>(0L, 0L);
-    // Get the size.
-    long currentSize = getRegionSize(location, clusterStatus);
-    // estimate the number of rows.
-    double bytesPerRow = currentRowSize / (double) currentRowCount;
-    if (currentSize == 0) {
-      return new Pair<Long, Long>(currentRowCount, (long) bytesPerRow);
-    }
-
-    // Compression factor two is only a best effort guess
-    long estimatedRowCount =
-        (long) ((isCompressed ? 2 : 1) * (currentSize / bytesPerRow));
-
-    return new Pair<Long, Long>(estimatedRowCount, (long) bytesPerRow);
-  }
-
-  /**
-   * Get an estimate of the number of rows and bytes per row in regions between
-   * startRowKey and endRowKey.
-   *
-   * This number is calculated by incrementally checking as many region servers as
-   * necessary until we observe a relatively constant row size per region on average.
-   * Depending on the skew of data in the regions this can either mean that we need
-   * to check only a minimal number of regions or that we will scan all regions.
-   *
-   * The HBase region servers periodically update the master with their metrics,
-   * including storefile size. We get the size of the storefiles for all regions in
-   * the cluster with a single call to getClusterStatus from the master.
-   *
-   * The accuracy of this number is determined by the number of rows that are written
-   * and kept in the memstore and have not been flushed until now. A large number
-   * of key-value pairs in the memstore will lead to bad estimates as this number
-   * is not reflected in the storefile size that is used to estimate this number.
-   *
-   * Currently, the algorithm does not consider the case that the key range used as a
-   * parameter might be generally of different size than the rest of the region.
-   *
-   * The values computed here should be cached so that in high qps workloads
-   * the nn is not overwhelmed. Could be done in load(); Synchronized to make
-   * sure that only one thread at a time is using the htable.
-   *
-   * @param startRowKey
-   *          First row key in the range
-   * @param endRowKey
-   *          Last row key in the range
-   * @return The estimated number of rows in the regions between the row keys (first) and
-   *         the estimated row size in bytes (second).
-   */
   public synchronized Pair<Long, Long> getEstimatedRowStats(byte[] startRowKey,
       byte[] endRowKey) {
-    Preconditions.checkNotNull(startRowKey);
-    Preconditions.checkNotNull(endRowKey);
-
-    boolean isCompressed = false;
-    long rowCount = 0;
-    long rowSize = 0;
-
-    org.apache.hadoop.hbase.client.Table table = null;
-    try {
-      table = getHBaseTable();
-      ClusterStatus clusterStatus = getClusterStatus();
-
-      // Check to see if things are compressed.
-      // If they are we'll estimate a compression factor.
-      if (columnFamilies_ == null) {
-        columnFamilies_ = table.getTableDescriptor().getColumnFamilies();
-      }
-      Preconditions.checkNotNull(columnFamilies_);
-      for (HColumnDescriptor desc : columnFamilies_) {
-        isCompressed |= desc.getCompression() !=  Compression.Algorithm.NONE;
-      }
-
-      // Fetch all regions for the key range
-      List<HRegionLocation> locations = getRegionsInRange(table, startRowKey, endRowKey);
-      Collections.shuffle(locations);
-      // The following variables track the number and size of 'rows' in
-      // HBase and allow incremental calculation of the average and standard
-      // deviation.
-      StatsHelper<Long> statsSize = new StatsHelper<Long>();
-      long totalEstimatedRows = 0;
-
-      // Collects stats samples from at least MIN_NUM_REGIONS_TO_CHECK
-      // and at most all regions until the delta is small enough.
-      while ((statsSize.count() < MIN_NUM_REGIONS_TO_CHECK ||
-          statsSize.stddev() > statsSize.mean() * DELTA_FROM_AVERAGE) &&
-          statsSize.count() < locations.size()) {
-        HRegionLocation currentLocation = locations.get((int) statsSize.count());
-        Pair<Long, Long> tmp = getEstimatedRowStatsForRegion(currentLocation,
-            isCompressed, clusterStatus);
-        totalEstimatedRows += tmp.first;
-        statsSize.addSample(tmp.second);
-      }
-
-      // Sum up the total size for all regions in range.
-      long totalSize = 0;
-      for (final HRegionLocation location : locations) {
-        totalSize += getRegionSize(location, clusterStatus);
-      }
-      if (totalSize == 0) {
-        rowCount = totalEstimatedRows;
-      } else {
-        rowCount = (long) (totalSize / statsSize.mean());
-      }
-      rowSize = (long) statsSize.mean();
-    } catch (IOException ioe) {
-      // Print the stack trace, but we'll ignore it
-      // as this is just an estimate.
-      // TODO: Put this into the per query log.
-      LOG.error("Error computing HBase row count estimate", ioe);
-      return new Pair<Long, Long>(-1l, -1l);
-    } finally {
-      if (table != null) closeHBaseTable(table);
-    }
-    return new Pair<Long, Long>(rowCount, rowSize);
-  }
-
-  /**
-   * Returns the size of the given region in bytes. Simply returns the storefile size
-   * for this region from the ClusterStatus. Returns 0 in case of an error.
-   */
-  public long getRegionSize(HRegionLocation location, ClusterStatus clusterStatus) {
-    HRegionInfo info = location.getRegionInfo();
-    ServerLoad serverLoad = clusterStatus.getLoad(location.getServerName());
-
-    // If the serverLoad is null, the master doesn't have information for this region's
-    // server. This shouldn't normally happen.
-    if (serverLoad == null) {
-      LOG.error("Unable to find server load for server: " + location.getServerName() +
-          " for location " + info.getRegionNameAsString());
-      return 0;
-    }
-    RegionLoad regionLoad = serverLoad.getRegionsLoad().get(info.getRegionName());
-    if (regionLoad == null) {
-      LOG.error("Unable to find regions load for server: " + location.getServerName() +
-          " for location " + info.getRegionNameAsString());
-      return 0;
-    }
-    final long megaByte = 1024L * 1024L;
-    return regionLoad.getStorefileSizeMB() * megaByte;
+    return Util.getEstimatedRowStats(this, startRowKey, endRowKey);
   }
 
   /**
@@ -665,11 +149,12 @@ public class HBaseTable extends Table {
   }
 
   @Override
-  public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
     TTableDescriptor tableDescriptor =
-        new TTableDescriptor(tableId, TTableType.HBASE_TABLE,
-            getTColumnDescriptors(), numClusteringCols_, hbaseTableName_, db_.getName());
-    tableDescriptor.setHbaseTable(getTHBaseTable());
+        new TTableDescriptor(tableId, TTableType.HBASE_TABLE, getTColumnDescriptors(),
+            numClusteringCols_, hbaseTableName_, db_.getName());
+    tableDescriptor.setHbaseTable(Util.getTHBaseTable(this));
     return tableDescriptor;
   }
 
@@ -678,6 +163,11 @@ public class HBaseTable extends Table {
   }
 
   @Override
+  public TResultSet getTableStats() {
+    return Util.getTableStats(this);
+  }
+
+  @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;
   }
@@ -686,161 +176,29 @@ public class HBaseTable extends Table {
   public TTable toThrift() {
     TTable table = super.toThrift();
     table.setTable_type(TTableType.HBASE_TABLE);
-    table.setHbase_table(getTHBaseTable());
+    table.setHbase_table(Util.getTHBaseTable(this));
     return table;
   }
 
-  private THBaseTable getTHBaseTable() {
-    THBaseTable tHbaseTable = new THBaseTable();
-    tHbaseTable.setTableName(hbaseTableName_);
-    for (Column c : getColumns()) {
-      HBaseColumn hbaseCol = (HBaseColumn) c;
-      tHbaseTable.addToFamilies(hbaseCol.getColumnFamily());
-      if (hbaseCol.getColumnQualifier() != null) {
-        tHbaseTable.addToQualifiers(hbaseCol.getColumnQualifier());
-      } else {
-        tHbaseTable.addToQualifiers("");
-      }
-      tHbaseTable.addToBinary_encoded(hbaseCol.isBinaryEncoded());
-    }
-    return tHbaseTable;
-  }
-
-  /**
-   * Get the corresponding regions for an arbitrary range of keys.
-   * This is copied from org.apache.hadoop.hbase.client.HTable in HBase 0.95. The
-   * differences are:
-   * 1. It does not use cache when calling getRegionLocation.
-   * 2. It is synchronized on hbaseTbl.
-   *
-   * @param startKey
-   *          Starting key in range, inclusive
-   * @param endKey
-   *          Ending key in range, exclusive
-   * @return A list of HRegionLocations corresponding to the regions that
-   *         contain the specified range
-   * @throws IOException
-   *           if a remote or network exception occurs
-   */
-  public static List<HRegionLocation> getRegionsInRange(
-      org.apache.hadoop.hbase.client.Table hbaseTbl,
-      final byte[] startKey, final byte[] endKey) throws IOException {
-    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
-    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
-      throw new IllegalArgumentException("Invalid range: " +
-          Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
-    }
-    final List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
-    byte[] currentKey = startKey;
-    Connection connection = ConnectionHolder.getConnection(hbaseConf_);
-    // Make sure only one thread is accessing the hbaseTbl.
-    synchronized (hbaseTbl) {
-      RegionLocator locator = connection.getRegionLocator(hbaseTbl.getName());
-      do {
-        // always reload region location info.
-        HRegionLocation regionLocation = locator.getRegionLocation(currentKey, true);
-        regionList.add(regionLocation);
-        currentKey = regionLocation.getRegionInfo().getEndKey();
-      } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
-          (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
-    }
-    return regionList;
-  }
-
   /**
    * Returns the storage handler class for HBase tables read by Hive.
    */
   @Override
   public String getStorageHandlerClassName() {
-    return HBASE_STORAGE_HANDLER;
+    return Util.HBASE_STORAGE_HANDLER;
   }
 
   /**
-   * Returns statistics on this table as a tabular result set. Used for the
-   * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
-   * inside this method.
+   * Fetch or use cached column families.
    */
-  public TResultSet getTableStats() {
-    TResultSet result = new TResultSet();
-    TResultSetMetadata resultSchema = new TResultSetMetadata();
-    result.setSchema(resultSchema);
-    resultSchema.addToColumns(
-        new TColumn("Region Location", Type.STRING.toThrift()));
-    resultSchema.addToColumns(new TColumn("Start RowKey",
-        Type.STRING.toThrift()));
-    resultSchema.addToColumns(new TColumn("Est. #Rows", Type.BIGINT.toThrift()));
-    resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
-
-    org.apache.hadoop.hbase.client.Table table;
-    try {
-      table = getHBaseTable();
-    } catch (IOException e) {
-      LOG.error("Error getting HBase table " + hbaseTableName_, e);
-      throw new RuntimeException(e);
-    }
-
-    // TODO: Consider fancier stats maintenance techniques for speeding up this process.
-    // Currently, we list all regions and perform a mini-scan of each of them to
-    // estimate the number of rows, the data size, etc., which is rather expensive.
-    try {
-      ClusterStatus clusterStatus = getClusterStatus();
-      long totalNumRows = 0;
-      long totalSize = 0;
-      List<HRegionLocation> regions = HBaseTable.getRegionsInRange(table,
-          HConstants.EMPTY_END_ROW, HConstants.EMPTY_START_ROW);
-      for (HRegionLocation region : regions) {
-        TResultRowBuilder rowBuilder = new TResultRowBuilder();
-        HRegionInfo regionInfo = region.getRegionInfo();
-        Pair<Long, Long> estRowStats =
-            getEstimatedRowStatsForRegion(region, false, clusterStatus);
-
-        long numRows = estRowStats.first.longValue();
-        long regionSize = getRegionSize(region, clusterStatus);
-        totalNumRows += numRows;
-        totalSize += regionSize;
-
-        // Add the region location, start rowkey, number of rows and raw size.
-        rowBuilder.add(String.valueOf(region.getHostname()))
-            .add(Bytes.toString(regionInfo.getStartKey())).add(numRows)
-            .addBytes(regionSize);
-        result.addToRows(rowBuilder.get());
-      }
-
-      // Total num rows and raw region size.
-      if (regions.size() > 1) {
-        TResultRowBuilder rowBuilder = new TResultRowBuilder();
-        rowBuilder.add("Total").add("").add(totalNumRows).addBytes(totalSize);
-        result.addToRows(rowBuilder.get());
+  @Override
+  public HColumnDescriptor[] getColumnFamilies() throws IOException {
+    if (columnFamilies_ == null) {
+      try (org.apache.hadoop.hbase.client.Table hBaseTable = Util
+          .getHBaseTable(getHBaseTableName())) {
+        columnFamilies_ = hBaseTable.getTableDescriptor().getColumnFamilies();
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      closeHBaseTable(table);
-    }
-    return result;
-  }
-
-  /**
-   * Returns true if the given Metastore Table represents an HBase table.
-   * Versions of Hive/HBase are inconsistent which HBase related fields are set
-   * (e.g., HIVE-6548 changed the input format to null).
-   * For maximum compatibility consider all known fields that indicate an HBase table.
-   */
-  public static boolean isHBaseTable(
-      org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    if (msTbl.getParameters() != null &&
-        msTbl.getParameters().containsKey(HBASE_STORAGE_HANDLER)) {
-      return true;
-    }
-    StorageDescriptor sd = msTbl.getSd();
-    if (sd == null) return false;
-    if (sd.getInputFormat() != null && sd.getInputFormat().equals(HBASE_INPUT_FORMAT)) {
-      return true;
-    } else if (sd.getSerdeInfo() != null &&
-        sd.getSerdeInfo().getSerializationLib() != null &&
-        sd.getSerdeInfo().getSerializationLib().equals(HBASE_SERIALIZATION_LIB)) {
-      return true;
     }
-    return false;
+    return columnFamilies_;
   }
 }


[4/5] impala git commit: IMPALA-7397: fix DCHECK in impala::AggregationNode::Close

Posted by ta...@apache.org.
IMPALA-7397: fix DCHECK in impala::AggregationNode::Close

As part of IMPALA-110, all of the logic of performing aggregations was
refactored out of the aggregation ExecNode and into Aggregators. Each
Aggregator manages its own memory, so a DCHECK was added in
AggregationNode::Close to ensure that no allocations were
made from the regular ExecNode mem pools.

This DCHECK is violated if the node was assigned conjuncts that
allocate mem in Prepare - even though the conjuncts are evaluated in
the Aggregator, we still initialize them in ExecNode::Prepare.

This patch solves the problem by creating a copy of the TPlanNode
without the conjuncts to pass to ExecNode. In the future, when
TAggregator is introduced, we can get rid of this by directly
assigning conjuncts to Aggregators.

Note that this doesn't affect StreamingAggregationNode, which never
has conjuncts assigned to it, but this patch fixes an incorrect DCHECK
that enforces this.

Testing:
- Added a regression test for this case.

Change-Id: I65ae00ac23a62ab9f4a7ff06ac9ad5cece80c39d
Reviewed-on: http://gerrit.cloudera.org:8080/11132
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/70bf9ea2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/70bf9ea2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/70bf9ea2

Branch: refs/heads/master
Commit: 70bf9ea29636a6fbecfd7a003cc746d3a0046edb
Parents: cf5de09
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Aug 6 19:06:40 2018 +0000
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Aug 6 23:45:08 2018 +0000

----------------------------------------------------------------------
 be/src/exec/aggregation-node.cc                              | 6 +++++-
 be/src/exec/streaming-aggregation-node.cc                    | 2 +-
 .../functional-query/queries/QueryTest/aggregation.test      | 8 ++++++++
 3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/70bf9ea2/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 2c95590..9f76768 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -38,7 +38,11 @@ AggregationNode::AggregationNode(
   : ExecNode(pool, tnode, descs) {}
 
 Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+  // The conjuncts will be evaluated in the Aggregator, so don't pass them to the
+  // ExecNode. TODO: remove this once we assign conjuncts directly to Aggregators.
+  TPlanNode tnode_no_conjuncts(tnode);
+  tnode_no_conjuncts.__set_conjuncts(std::vector<TExpr>());
+  RETURN_IF_ERROR(ExecNode::Init(tnode_no_conjuncts, state));
   if (tnode.agg_node.grouping_exprs.empty()) {
     aggregator_.reset(new NonGroupingAggregator(this, pool_, tnode, state->desc_tbl()));
   } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/70bf9ea2/be/src/exec/streaming-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc
index c1e9184..7280849 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -33,7 +33,7 @@ namespace impala {
 StreamingAggregationNode::StreamingAggregationNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : ExecNode(pool, tnode, descs), child_eos_(false) {
-  DCHECK(conjunct_evals_.empty()) << "Preaggs have no conjuncts";
+  DCHECK(tnode.conjuncts.empty()) << "Preaggs have no conjuncts";
   DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
   DCHECK(limit_ == -1) << "Preaggs have no limits";
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/70bf9ea2/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
index 88dd97c..7aba138 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
@@ -1370,3 +1370,11 @@ from x
 ---- TYPES
 DOUBLE,DOUBLE,DOUBLE,DOUBLE
 ====
+---- QUERY
+# IMPALA-7397: conjunct that makes allocations in Prepare (extract) assigned to agg node
+select timestamp_col, count(int_col) from alltypesagg group by timestamp_col, int_col
+having extract(hour from timestamp_col) = int_col
+---- TYPES
+TIMESTAMP,BIGINT
+---- RESULTS
+====


[3/5] impala git commit: IMPALA-7385: Fix test-with-docker errors having to do with time zones.

Posted by ta...@apache.org.
IMPALA-7385: Fix test-with-docker errors having to do with time zones.

ExprTest.TimestampFunctions,
query_test.test_scanners.TestOrc.test_type_conversions, and
query_test.test_queries.TestHdfsQueries.test_hdfs_scan_node were all
failing when using test-with-docker with mismatched dates.

As it turns out, there is code that calls readlink(/etc/localtime)
and parses the output to identify the current timezone name.
This is described in localtime(5) on Ubuntu16:

  It should be an absolute or relative symbolic link pointing to
  /usr/share/zoneinfo/, followed by a timezone identifier such as
  "Europe/Berlin" or "Etc/UTC". ...  Because the timezone identifier is
  extracted from the symlink target name of /etc/localtime, this file
  may not be a normal file or hardlink."

To honor this requirement, and to make the tests pass, I re-jiggered
how I pass the time zone information from the host into the container.

The previously failing tests now pass.

Change-Id: Ia9facfd9741806e7dbb868d8d06d9296bf86e52f
Reviewed-on: http://gerrit.cloudera.org:8080/11106
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cf5de097
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cf5de097
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cf5de097

Branch: refs/heads/master
Commit: cf5de09761c21aee4f3d571a94fbee5bda306a97
Parents: e6bf4dc
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon Jul 30 16:43:09 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Aug 6 22:41:02 2018 +0000

----------------------------------------------------------------------
 docker/entrypoint.sh       | 28 +++++++++-------------------
 docker/test-with-docker.py | 10 ++++++++--
 2 files changed, 17 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cf5de097/docker/entrypoint.sh
----------------------------------------------------------------------
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index 382ca65..0e192c9 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -349,27 +349,17 @@ function test_suite() {
   return $ret
 }
 
-# Ubuntu's tzdata package is very finnicky, and if you
-# mount /etc/localtime from the host to the container directly,
-# it fails to install. However, if you make it a symlink
-# and configure /etc/timezone to something that's not an
-# empty string, you'll get the right behavior.
-#
-# The post installation script is findable by looking for "tzdata.postinst"
-#
-# Use this command to reproduce the Ubuntu issue:
-#   docker run -v /etc/localtime:/mnt/localtime -ti ubuntu:16.04 bash -c '
-#     date
-#     ln -sf /mnt/localtime /etc/localtime
-#     date +%Z > /etc/timezone
-#     date
-#     apt-get update > /dev/null
-#     apt-get install tzdata
-#     date'
+# It's convenient (for log files to be legible) for the container
+# to have the host timezone. However, /etc/localtime is finnicky
+# (see localtime(5)) and mounting it to the host /etc/localtime or
+# symlinking it there doesn't always work. Instead, we expect
+# $LOCALTIME_LINK_TARGET to be set to a path in /usr/share/zoneinfo.
 function configure_timezone() {
-  if ! diff -q /etc/localtime /mnt/localtime 2> /dev/null; then
-    ln -sf /mnt/localtime /etc/localtime
+  if [ -e "${LOCALTIME_LINK_TARGET}" ]; then
+    ln -sf "${LOCALTIME_LINK_TARGET}" /etc/localtime
     date +%Z > /etc/timezone
+  else
+    echo '$LOCALTIME_LINK_TARGET not configured.' 1>&2
   fi
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/cf5de097/docker/test-with-docker.py
----------------------------------------------------------------------
diff --git a/docker/test-with-docker.py b/docker/test-with-docker.py
index 3da700f..dc3bf95 100755
--- a/docker/test-with-docker.py
+++ b/docker/test-with-docker.py
@@ -499,6 +499,12 @@ class TestWithDocker(object):
     if self.test_mode:
       extras = ["-e", "TEST_TEST_WITH_DOCKER=true"] + extras
 
+    # According to localtime(5), /etc/localtime is supposed
+    # to be a symlink to somewhere inside /usr/share/zoneinfo
+    assert os.path.islink("/etc/localtime")
+    localtime_link_target = os.path.realpath("/etc/localtime")
+    assert localtime_link_target.startswith("/usr/share/zoneinfo")
+
     container_id = _check_output([
         "docker", "create",
         # Required for some of the ntp handling in bootstrap and Kudu;
@@ -526,9 +532,9 @@ class TestWithDocker(object):
         "-v", self.git_root + ":/repo:ro",
         "-v", self.git_common_dir + ":/git_common_dir:ro",
         "-e", "GIT_HEAD_REV=" + self.git_head_rev,
-        "-v", self.ccache_dir + ":/ccache",
         # Share timezone between host and container
-        "-v", "/etc/localtime:/mnt/localtime",
+        "-e", "LOCALTIME_LINK_TARGET=" + localtime_link_target,
+        "-v", self.ccache_dir + ":/ccache",
         "-v", _make_dir_if_not_exist(self.log_dir,
                                      logdir) + ":/logs",
         "-v", base + ":/mnt/base:ro"]


[5/5] impala git commit: IMPALA-7242/IMPALA-7243: check for overflow when converting IntVal to DecimalValue

Posted by ta...@apache.org.
IMPALA-7242/IMPALA-7243: check for overflow when converting IntVal to DecimalValue

This change adds a check for an overflow which can happen during
converting num_buckets from IntVal to a DecimalValue during width_bucket
evaluation. An overflow can result in hitting a DCHECK if it needs int256
for intermediate evaluations or returns a wrong result otherwise.

Change-Id: Ie5808802096bb0e6cfd4ab6464538474e42cab60
Reviewed-on: http://gerrit.cloudera.org:8080/11020
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/df3f1658
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/df3f1658
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/df3f1658

Branch: refs/heads/master
Commit: df3f1658461553a174dd27fce2edce8594aefd16
Parents: 70bf9ea
Author: aphadke <ap...@cloudera.com>
Authored: Tue Jul 10 15:39:45 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Aug 7 00:36:15 2018 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc         | 11 ++++++++++-
 be/src/exprs/math-functions-ir.cc |  7 +++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/df3f1658/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index e03f458..b50ba23 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -5379,7 +5379,16 @@ TEST_F(ExprTest, MathFunctions) {
   // max and min value that would require int256_t for evalation
   TestValue("width_bucket(10000000000000000000000000000000000000, 1,"
             "99999999999999999999999999999999999999, 15)", TYPE_BIGINT, 2);
-
+  // IMPALA-7242/IMPALA-7243: check for overflow when converting IntVal to DecimalValue
+  TestErrorString("width_bucket(cast(-0.10 as decimal(37,30)), cast(-0.36028797018963968 "
+      "as decimal(25,25)), cast(9151517.4969773200562764155787276999832"
+      "as decimal(38,31)), 1328180220)",
+      "UDF ERROR: Overflow while representing the num_buckets:1328180220 as a "
+      "DecimalVal\n");
+  TestErrorString("width_bucket(cast(9 as decimal(10,7)), cast(-60000 as decimal(11,6)), "
+      "cast(10 as decimal(7,5)), 249895273);",
+      "UDF ERROR: Overflow while representing the num_buckets:249895273 as a "
+      "DecimalVal\n");
   // Run twice to test deterministic behavior.
   for (uint32_t seed : {0, 1234}) {
     stringstream rand, random;

http://git-wip-us.apache.org/repos/asf/impala/blob/df3f1658/be/src/exprs/math-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/math-functions-ir.cc b/be/src/exprs/math-functions-ir.cc
index 91af3f4..28c9e1e 100644
--- a/be/src/exprs/math-functions-ir.cc
+++ b/be/src/exprs/math-functions-ir.cc
@@ -537,6 +537,13 @@ BigIntVal MathFunctions::WidthBucketImpl(FunctionContext* ctx,
   Decimal16Value buckets = Decimal16Value::FromInt(input_precision, input_scale,
       num_buckets.val, &overflow);
 
+  if (UNLIKELY(overflow)) {
+    stringstream error_msg;
+    error_msg << "Overflow while representing the num_buckets:" << num_buckets.val
+        << " as a DecimalVal";
+    ctx->SetError(error_msg.str().c_str());
+    return BigIntVal::null();
+  }
   bool needs_int256 = false;
   // Check if dist_from_min * buckets would overflow and if there is a need to
   // store the intermediate results in int256_t to avoid an overflows